You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/07 01:17:18 UTC

svn commit: r1056135 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-jdbm2/src/main/scala/o...

Author: chirino
Date: Fri Jan  7 00:17:17 2011
New Revision: 1056135

URL: http://svn.apache.org/viewvc?rev=1056135&view=rev
Log:
initial pass at supporting zero copy buffers between stomp and jdbm2 store.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Jan  7 00:17:17 2011
@@ -27,8 +27,8 @@ message MessagePB {
   required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
   required int32 size = 3;
   optional bytes value = 4;
-  optional int64 streamKey = 5;
-  optional int64 expiration = 6;
+  optional bool  direct = 5;
+  optional sint64 expiration = 6;
 }
 
 message QueuePB {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Fri Jan  7 00:17:17 2011
@@ -139,7 +139,7 @@ object Delivery extends Sizer[Delivery] 
   def size(value:Delivery):Int = value.size
 }
 
-class Delivery extends BaseRetained {
+class Delivery {
 
   /**
    * Total size of the delivery.  Used for resource allocation tracking

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Jan  7 00:17:17 2011
@@ -981,8 +981,6 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        delivery.message.release
-
         state = new Swapped(delivery.storeKey, size)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala Fri Jan  7 00:17:17 2011
@@ -21,6 +21,8 @@ import java.nio.channels.{FileChannel, W
 import java.nio.ByteBuffer
 import java.io._
 import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch.internal.DispatcherConfig
+
 /**
  * <p>
  * </p>
@@ -120,7 +122,7 @@ trait FileDirectBufferTrait extends Dire
   }
 }
 
-case class Allocation(size:Long, offset:Long) extends Ordered[Allocation] {
+case class Allocation(offset:Long, size:Long) extends Ordered[Allocation] {
 
   var _free_func: (Allocation)=>Unit = _
 
@@ -188,29 +190,64 @@ class TreeAllocator(range:Allocation) ex
     }
 
     val allocation = spot_entry.getKey
-    free_by_size.remove(allocation)
+    free_by_size.removeEntry(spot_entry)
+    free_by_offset.remove(allocation.offset)
 
     // might be the perfect size
-    if( allocation.size == request ) {
-      allocation._free_func = free
+    val rc = if( allocation.size == request ) {
       allocation
     } else {
       // split the allocation..
       var (first, second) = allocation.split(request)
 
-      free_by_offset.remove(first.offset)
-      free_by_offset.put(second.offset, second)
-
       // put the free part in the free map.
+      free_by_offset.put(second.offset, second)
       free_by_size.put(second, null)
 
-      first._free_func = free
       first
     }
+    rc._free_func = free
+    rc
   }
 
-  def free(allocation:Allocation):Unit = {
+  def alloc_at(req:Allocation):Boolean = {
+    var spot_entry = free_by_offset.floorEntry(req.offset)
+    if( spot_entry== null ) {
+      return false
+    }
+
+    var spot = spot_entry.getValue
+    if( spot.offset+spot.size < req.offset+req.size ) {
+      return false
+    }
+
+    free_by_offset.removeEntry(spot_entry)
+    free_by_size.remove(spot)
+
+    // only need to put back if it was not exactly what we need.
+    if( spot != req ) {
+
+      // deal with excess at the front
+      if( spot.offset != req.offset ) {
+        val (prev, next) = spot.split(req.offset - spot.offset)
+        free_by_offset.put(prev.offset, prev)
+        free_by_size.put(prev, null)
+        spot = next
+      }
+
+      // deal with excess at the rear
+      if( spot.size != req.size ) {
+        val (prev, next) = spot.split(req.size)
+        free_by_offset.put(next.offset, next)
+        free_by_size.put(next, null)
+      }
+    }
 
+    req._free_func = free
+    true
+  }
+
+  def free(allocation:Allocation):Unit = {
 
     var prev_e = free_by_offset.floorEntry(allocation.offset)
     var next_e = if( prev_e!=null ) {
@@ -296,13 +333,34 @@ class ActiveAllocator(val range:Allocati
 class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
 
   // we use thread local allocators to
-  class AllocatorContext(val queue:DispatchQueue) {
+  class AllocatorContext(val id:Int) {
 
     val allocator = new TreeAllocator(Allocation(0, Long.MaxValue))
-    var channel:FileChannel = new RandomAccessFile(queue.getLabel, "rw").getChannel
+    var channel:FileChannel = new RandomAccessFile(new File(directory, ""+id+".data"), "rw").getChannel
+    var queue:DispatchQueue = _
+
+    var last_sync_size = channel.size
+    @volatile
+    var current_size = last_sync_size
+
+    def size_changed = this.synchronized {
+      val t = current_size
+      if( t != last_sync_size ) {
+        last_sync_size = t
+        true
+      } else {
+        false
+      }
+    }
+
+    def sync = {
+      channel.force(size_changed)
+    }
 
     class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait {
       def channel: FileChannel = AllocatorContext.this.channel
+
+      def file = id
       def offset: Long = allocation.offset
       def size: Int = allocation.size.toInt
 
@@ -316,24 +374,74 @@ class FileDirectBufferAllocator(val dire
       }
     }
 
-    def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+    def alloc(size: Int): DirectBuffer = current_context { ctx=>
       val allocation = allocator.alloc(size)
       assert(allocation!=null)
+      current_size = current_size.max(allocation.offset + allocation.size)
       new AllocationBuffer(allocation)
     }
   }
 
+  def to_alloc_buffer(buffer:DirectBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
+
   val _current_allocator_context = new ThreadLocal[AllocatorContext]()
+  var contexts = Map[Int, AllocatorContext]()
 
-  protected def start() = {
+  def start() = {
     directory.mkdirs
+    val config = new DispatcherConfig()
+    for( i <- 0 until config.getThreads ) {
+      val ctx = new AllocatorContext(i)
+      contexts += i->ctx
+      getThreadQueue(i) {
+        ctx.queue = getCurrentThreadQueue
+        _current_allocator_context.set(ctx)
+      }
+    }
   }
 
-  def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+  def stop() = {
+    val config = new DispatcherConfig()
+    for( i <- 0 until config.getThreads ) {
+      contexts = Map()
+      getThreadQueue(i) {
+        _current_allocator_context.remove
+      }
+    }
+  }
+
+  def sync(file: Int) = {
+    contexts.get(file).get.sync
+  }
+
+  def alloc(size: Int): DirectBuffer = current_context { ctx=>
     ctx.alloc(size)
   }
 
-  def with_allocator_context[T](func: (AllocatorContext)=>T):T = {
+  def alloc_at(file:Int, offset:Long, size:Int):Unit = context(file) { ctx=>
+    ctx.allocator.alloc_at(Allocation(offset, size))
+  }
+
+  def free(file:Int, offset:Long, size:Int):Unit = context(file) { ctx=>
+    ctx.allocator.free(Allocation(offset, size))
+  }
+
+  def view_buffer(file:Int, the_offset:Long, the_size:Int):DirectBuffer = {
+    val the_channel = contexts.get(file).get.channel
+    new BaseRetained with FileDirectBufferTrait {
+      def offset: Long = the_offset
+      def size: Int = the_size
+      val channel: FileChannel = the_channel
+    }
+  }
+
+  def context(i:Int)(func: (AllocatorContext)=>Unit):Unit= {
+    getThreadQueue(i) {
+      func(current_allocator_context)
+    }
+  }
+
+  def current_context[T](func: (AllocatorContext)=>T):T = {
     if( getCurrentThreadQueue == null ) {
       getGlobalQueue().future(func(current_allocator_context))()
     } else {
@@ -341,14 +449,6 @@ class FileDirectBufferAllocator(val dire
     }
   }
 
-  def current_allocator_context:AllocatorContext = {
-    val thread_queue = getCurrentThreadQueue
-    assert(thread_queue != null)
-    var rc = _current_allocator_context.get
-    if( rc==null ) {
-      rc = new AllocatorContext(thread_queue)
-      _current_allocator_context.set(rc)
-    }
-    rc
-  }
+  def current_allocator_context:AllocatorContext = _current_allocator_context.get
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Jan  7 00:17:17 2011
@@ -30,13 +30,12 @@ import java.util.Comparator
 import jdbm.helper._
 import PBSupport._
 import org.fusesource.hawtbuf.proto.PBMessageFactory
-import java.io.{EOFException, InputStream, OutputStream, Serializable}
-
+import java.io._
 object JDBM2Client extends Log {
 
-  object MessageRecordSerializer extends Serializer[MessageRecord] {
-    def serialize(out: SerializerOutput, v: MessageRecord) = encode_message_record(out, v)
-    def deserialize(in: SerializerInput) = decode_message_record(in)
+  object MessageRecordSerializer extends Serializer[MessagePB.Buffer] {
+    def serialize(out: SerializerOutput, v: MessagePB.Buffer) = v.writeUnframed(out)
+    def deserialize(in: SerializerInput) = MessagePB.FACTORY.parseUnframed(in).freeze
   }
 
   object QueueRecordSerializer extends Serializer[QueueRecord] {
@@ -49,6 +48,18 @@ object JDBM2Client extends Log {
     def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
   }
 
+  object LobValueSerializer extends Serializer[(Int, Long, Int)] {
+    def serialize(out: SerializerOutput, v: (Int,Long, Int)) = {
+      out.writePackedInt(v._1)
+      out.writePackedLong(v._2)
+      out.writePackedInt(v._3)
+    }
+
+    def deserialize(in: SerializerInput) = {
+      (in.readPackedInt, in.readPackedLong, in.readPackedInt)
+    }
+  }
+
   object QueueEntryKeySerializer extends Serializer[(Long,Long)] {
     def serialize(out: SerializerOutput, v: (Long,Long)) = {
       out.writePackedLong(v._1)
@@ -136,17 +147,30 @@ class JDBM2Client(store: JDBM2Store) {
 
   var queues_db:HTree[Long, QueueRecord] = _
   var entries_db:BTree[(Long,Long), QueueEntryRecord] = _
-  var messages_db:HTree[Long, MessageRecord] = _
+  var messages_db:HTree[Long, MessagePB.Buffer] = _
+  var lobs_db:HTree[Long, (Int, Long, Int)] = _
   var message_refs_db:HTree[Long, java.lang.Integer] = _
 
   var last_message_key = 0L
   var last_queue_key = 0L
 
+  var direct_buffer_allocator: FileDirectBufferAllocator = _
+
+  def zero_copy_dir = {
+    import FileSupport._
+    config.directory / "zerocp"
+  }
+
   def start() = {
+    import FileSupport._
 
     config.directory.mkdirs
 
-    import FileSupport._
+    if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
+      direct_buffer_allocator = new FileDirectBufferAllocator(zero_copy_dir)
+      direct_buffer_allocator.start
+    }
+
     recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
 
     def init_btree[K, V](name: String, key_comparator:Comparator[K]=ComparableComparator.INSTANCE.asInstanceOf[Comparator[K]], key_serializer:Serializer[K]=null, value_serializer:Serializer[V]=null) = {
@@ -175,15 +199,22 @@ class JDBM2Client(store: JDBM2Store) {
       rc
     }
 
-
     transaction {
       messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
+      lobs_db = init_htree("lobs", value_serializer = LobValueSerializer)
       message_refs_db = init_htree("message_refs")
       queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
       entries_db = init_btree("enttries", new QueueEntryKeyComparator, QueueEntryKeySerializer, QueueEntryRecordSerializer)
 
       last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
       last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
+
+      if( direct_buffer_allocator!=null ) {
+        lobs_db.cursor { (_,v)=>
+          direct_buffer_allocator.alloc_at(v._1, v._2, v._3)
+          true
+        }
+      }
     }
 
   }
@@ -191,6 +222,10 @@ class JDBM2Client(store: JDBM2Store) {
   def stop() = {
     recman.close
     recman = null;
+    if( direct_buffer_allocator!=null ) {
+      direct_buffer_allocator.stop
+      direct_buffer_allocator = null
+    }
   }
 
   def transaction[T](func: => T): T = {
@@ -214,6 +249,7 @@ class JDBM2Client(store: JDBM2Store) {
       if( config.directory.isDirectory ) {
         config.directory.listFiles.filter(_.getName.startsWith("jdbm2.")).foreach(_.delete)
       }
+      zero_copy_dir.delete
     }
     if( recman!=null ) {
       stop
@@ -267,6 +303,12 @@ class JDBM2Client(store: JDBM2Store) {
       gc.foreach { key=>
         message_refs_db.remove(key)
         messages_db.remove(key)
+        if( direct_buffer_allocator!=null ){
+          val location = lobs_db.find(key)
+          if( location!=null ) {
+            direct_buffer_allocator.free(location._1, location._2, location._3)
+          }
+        }
       }
     }
     recman.defrag
@@ -304,11 +346,25 @@ class JDBM2Client(store: JDBM2Store) {
 
   def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
     transaction {
+      var needs_direct_buffer_sync = Set[Int]()
       uows.foreach { uow =>
         uow.actions.foreach { case (msg, action) =>
 
-          if (action.messageRecord != null) {
-            messages_db.put(action.messageRecord.key, action.messageRecord)
+          val message_record = action.messageRecord
+          if (message_record != null) {
+
+            val pb = if( message_record.direct_buffer != null ) {
+              val r = to_pb(action.messageRecord).copy
+              val buffer = direct_buffer_allocator.to_alloc_buffer(message_record.direct_buffer)
+              lobs_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
+              needs_direct_buffer_sync += buffer.file
+              r.setDirect(true)
+              r.freeze
+            } else {
+              to_pb(action.messageRecord)
+            }
+
+            messages_db.put(action.messageRecord.key, pb)
             if( action.messageRecord.key > last_message_key ) {
               last_message_key = action.messageRecord.key
               recman.setNamedObject("last_message_key", last_message_key)
@@ -327,6 +383,9 @@ class JDBM2Client(store: JDBM2Store) {
 
         }
       }
+      if( direct_buffer_allocator!=null ) {
+        needs_direct_buffer_sync.foreach(direct_buffer_allocator.sync(_))
+      }
     }
     callback.run
   }
@@ -385,23 +444,19 @@ class JDBM2Client(store: JDBM2Store) {
   var metric_load_from_index = metric_load_from_index_counter(false)
 
   def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
-    val records =  requests.flatMap { case (message_key, callback)=>
+    requests.foreach { case (message_key, callback)=>
       val record = metric_load_from_index_counter.time {
-        Option(messages_db.find(message_key))
-      }
-      record match {
-        case None =>
-        debug("Message not indexed: %s", message_key)
-        callback(None)
-        None
-        case Some(x) => Some((record, callback))
+        Option(messages_db.find(message_key)).map{ pb=>
+          val rc = from_pb(pb)
+          if( pb.getDirect ) {
+            val location = lobs_db.find(message_key)
+            rc.direct_buffer = direct_buffer_allocator.view_buffer(location._1, location._2, location._3)
+          }
+          rc
+        }
       }
+      callback(record)
     }
-
-    records.foreach { case (record, callback)=>
-      callback( record )
-    }
-
   }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Jan  7 00:17:17 2011
@@ -80,6 +80,8 @@ class JDBM2Store extends DelayingStoreSu
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
+  override def direct_buffer_allocator():DirectBufferAllocator = client.direct_buffer_allocator
+
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     executor {
       client.store(uows, ^{

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java Fri Jan  7 00:17:17 2011
@@ -34,7 +34,9 @@ public class JDBM2StoreDTO extends Store
     @XmlAttribute
     public File directory;
 
-    @XmlAttribute
+    @XmlAttribute(name="compact_interval")
     public Integer compact_interval;
 
+    @XmlAttribute(name="zero_copy")
+    public Boolean zero_copy;
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Jan  7 00:17:17 2011
@@ -348,6 +348,8 @@ class StompCodec extends ProtocolCodec w
         val count = read_direct.write(read_channel, read_direct_pos)
         if (count == -1) {
             throw new EOFException("Peer disconnected")
+        } else if (count == 0) {
+            return null
         }
         read_direct_pos += count
       } else if (read_end == read_buffer.position() ) {