You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "SuYan (JIRA)" <ji...@apache.org> on 2015/06/02 15:55:18 UTC

[jira] [Created] (SPARK-8044) Invoid use directMemory while put or get block from file

SuYan created SPARK-8044:
----------------------------

             Summary: Invoid use directMemory while put or get block from file
                 Key: SPARK-8044
                 URL: https://issues.apache.org/jira/browse/SPARK-8044
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 1.3.1
            Reporter: SuYan
            Priority: Critical


1.  I found if we use getChannel to put or get data, it will create DirectBuffer anyway, which is not controllable.

according openJDK source code: because it will create a ThreadLocal directBuffer pool, and is not provider a 100% percent way to sure the direct buffer to be released.

{code}
 sun.nio.ch.FileChannelImpl.java
public int write(ByteBuffer src) throws IOException {
210         ensureOpen();
211         if (!writable)
212             throw new NonWritableChannelException();
213         synchronized (positionLock) {
214             int n = 0;
215             int ti = -1;
216             try {
217                 begin();
218                 if (!isOpen())
219                     return 0;
220                 ti = threads.add();
221                 if (appending)
222                     position(size());
223                 do {
224                     n = IOUtil.write(fd, src, -1, nd, positionLock);
225                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
226                 return IOStatus.normalize(n);
227             } finally {
228                 threads.remove(ti);
229                 end(n > 0);
230                 assert IOStatus.check(n);
231             }
232         }
233     }
{code}
{code}
IOUtil.java

static int write(FileDescriptor fd, ByteBuffer src, long position,
74                      NativeDispatcher nd, Object lock)
75         throws IOException
76     {
77         if (src instanceof DirectBuffer)
78             return writeFromNativeBuffer(fd, src, position, nd, lock);
79 
80         // Substitute a native buffer
81         int pos = src.position();
82         int lim = src.limit();
83         assert (pos <= lim);
84         int rem = (pos <= lim ? lim - pos : 0);
85         ByteBuffer bb = null;
86         try {
87             bb = Util.getTemporaryDirectBuffer(rem);
88             bb.put(src);
89             bb.flip();
90             // Do not update src until we see how many bytes were written
91             src.position(pos);
92 
93             int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
94             if (n > 0) {
95                 // now update src
96                 src.position(pos + n);
97             }
98             return n;
99         } finally {
100            Util.releaseTemporaryDirectBuffer(bb);
101        }
102    }
{code}
{code}
Util.java
     static ByteBuffer getTemporaryDirectBuffer(int size) {
61         ByteBuffer buf = null;
62         // Grab a buffer if available
63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64             SoftReference ref = (SoftReference)(bufferPool[i].get());
65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66                 (buf.capacity() >= size)) {
67                 buf.rewind();
68                 buf.limit(size);
69                 bufferPool[i].set(null);
70                 return buf;
71             }
72         }
73 
74         // Make a new one
75         return ByteBuffer.allocateDirect(size);
76     }
{code}
{code}
 private static final int TEMP_BUF_POOL_SIZE = 3;
50 
51     // Per-thread soft cache of the last temporary direct buffer
52     private static ThreadLocal[] bufferPool;
53 
54     static {
55         bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
56         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
57             bufferPool[i] = new ThreadLocal();
58     }
59 
60     static ByteBuffer getTemporaryDirectBuffer(int size) {
61         ByteBuffer buf = null;
62         // Grab a buffer if available
63         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64             SoftReference ref = (SoftReference)(bufferPool[i].get());
65             if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66                 (buf.capacity() >= size)) {
67                 buf.rewind();
68                 buf.limit(size);
69                 bufferPool[i].set(null);
70                 return buf;
71             }
72         }
73 
74         // Make a new one
75         return ByteBuffer.allocateDirect(size);
76     }
77 
78     static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
79         if (buf == null)
80             return;
81         // Put it in an empty slot if such exists
82         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
83             SoftReference ref = (SoftReference)(bufferPool[i].get());
84             if ((ref == null) || (ref.get() == null)) {
85                 bufferPool[i].set(new SoftReference(buf));
86                 return;
87             }
88         }
89         // Otherwise replace a smaller one in the cache if such exists
90         for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
91             SoftReference ref = (SoftReference)(bufferPool[i].get());
92             ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
93             if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
94                 bufferPool[i].set(new SoftReference(buf));
95                 return;
96             }
97         }
98     }

{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org