You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/06/02 15:56:18 UTC

[jira] [Assigned] (SPARK-8044) Invoid use directMemory while put or get block from file

     [ https://issues.apache.org/jira/browse/SPARK-8044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-8044:
-----------------------------------

    Assignee:     (was: Apache Spark)

> Invoid use directMemory while put or get block from file
> --------------------------------------------------------
>
>                 Key: SPARK-8044
>                 URL: https://issues.apache.org/jira/browse/SPARK-8044
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.3.1
>            Reporter: SuYan
>            Priority: Critical
>
> 1.  I found if we use getChannel to put or get data, it will create DirectBuffer anyway, which is not controllable.
> according openJDK source code: because it will create a ThreadLocal directBuffer pool, and is not provider a 100% percent way to sure the direct buffer to be released.
> {code}
>  sun.nio.ch.FileChannelImpl.java
> public int write(ByteBuffer src) throws IOException {
> 210         ensureOpen();
> 211         if (!writable)
> 212             throw new NonWritableChannelException();
> 213         synchronized (positionLock) {
> 214             int n = 0;
> 215             int ti = -1;
> 216             try {
> 217                 begin();
> 218                 if (!isOpen())
> 219                     return 0;
> 220                 ti = threads.add();
> 221                 if (appending)
> 222                     position(size());
> 223                 do {
> 224                     n = IOUtil.write(fd, src, -1, nd, positionLock);
> 225                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
> 226                 return IOStatus.normalize(n);
> 227             } finally {
> 228                 threads.remove(ti);
> 229                 end(n > 0);
> 230                 assert IOStatus.check(n);
> 231             }
> 232         }
> 233     }
> {code}
> {code}
> IOUtil.java
> static int write(FileDescriptor fd, ByteBuffer src, long position,
> 74                      NativeDispatcher nd, Object lock)
> 75         throws IOException
> 76     {
> 77         if (src instanceof DirectBuffer)
> 78             return writeFromNativeBuffer(fd, src, position, nd, lock);
> 79 
> 80         // Substitute a native buffer
> 81         int pos = src.position();
> 82         int lim = src.limit();
> 83         assert (pos <= lim);
> 84         int rem = (pos <= lim ? lim - pos : 0);
> 85         ByteBuffer bb = null;
> 86         try {
> 87             bb = Util.getTemporaryDirectBuffer(rem);
> 88             bb.put(src);
> 89             bb.flip();
> 90             // Do not update src until we see how many bytes were written
> 91             src.position(pos);
> 92 
> 93             int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
> 94             if (n > 0) {
> 95                 // now update src
> 96                 src.position(pos + n);
> 97             }
> 98             return n;
> 99         } finally {
> 100            Util.releaseTemporaryDirectBuffer(bb);
> 101        }
> 102    }
> {code}
> {code}
> Util.java
>      static ByteBuffer getTemporaryDirectBuffer(int size) {
> 61         ByteBuffer buf = null;
> 62         // Grab a buffer if available
> 63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 64             SoftReference ref = (SoftReference)(bufferPool[i].get());
> 65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
> 66                 (buf.capacity() >= size)) {
> 67                 buf.rewind();
> 68                 buf.limit(size);
> 69                 bufferPool[i].set(null);
> 70                 return buf;
> 71             }
> 72         }
> 73 
> 74         // Make a new one
> 75         return ByteBuffer.allocateDirect(size);
> 76     }
> {code}
> {code}
>  private static final int TEMP_BUF_POOL_SIZE = 3;
> 50 
> 51     // Per-thread soft cache of the last temporary direct buffer
> 52     private static ThreadLocal[] bufferPool;
> 53 
> 54     static {
> 55         bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
> 56         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
> 57             bufferPool[i] = new ThreadLocal();
> 58     }
> 59 
> 60     static ByteBuffer getTemporaryDirectBuffer(int size) {
> 61         ByteBuffer buf = null;
> 62         // Grab a buffer if available
> 63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 64             SoftReference ref = (SoftReference)(bufferPool[i].get());
> 65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
> 66                 (buf.capacity() >= size)) {
> 67                 buf.rewind();
> 68                 buf.limit(size);
> 69                 bufferPool[i].set(null);
> 70                 return buf;
> 71             }
> 72         }
> 73 
> 74         // Make a new one
> 75         return ByteBuffer.allocateDirect(size);
> 76     }
> 77 
> 78     static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
> 79         if (buf == null)
> 80             return;
> 81         // Put it in an empty slot if such exists
> 82         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 83             SoftReference ref = (SoftReference)(bufferPool[i].get());
> 84             if ((ref == null) || (ref.get() == null)) {
> 85                 bufferPool[i].set(new SoftReference(buf));
> 86                 return;
> 87             }
> 88         }
> 89         // Otherwise replace a smaller one in the cache if such exists
> 90         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 91             SoftReference ref = (SoftReference)(bufferPool[i].get());
> 92             ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
> 93             if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
> 94                 bufferPool[i].set(new SoftReference(buf));
> 95                 return;
> 96             }
> 97         }
> 98     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org