You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/06/02 15:56:18 UTC
[jira] [Commented] (SPARK-8044) Invoid use directMemory while put
or get block from file
[ https://issues.apache.org/jira/browse/SPARK-8044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569124#comment-14569124 ]
Apache Spark commented on SPARK-8044:
-------------------------------------
User 'suyanNone' has created a pull request for this issue:
https://github.com/apache/spark/pull/6586
> Invoid use directMemory while put or get block from file
> --------------------------------------------------------
>
> Key: SPARK-8044
> URL: https://issues.apache.org/jira/browse/SPARK-8044
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 1.3.1
> Reporter: SuYan
> Priority: Critical
>
> 1. I found if we use getChannel to put or get data, it will create DirectBuffer anyway, which is not controllable.
> according openJDK source code: because it will create a ThreadLocal directBuffer pool, and is not provider a 100% percent way to sure the direct buffer to be released.
> {code}
> sun.nio.ch.FileChannelImpl.java
> public int write(ByteBuffer src) throws IOException {
> 210 ensureOpen();
> 211 if (!writable)
> 212 throw new NonWritableChannelException();
> 213 synchronized (positionLock) {
> 214 int n = 0;
> 215 int ti = -1;
> 216 try {
> 217 begin();
> 218 if (!isOpen())
> 219 return 0;
> 220 ti = threads.add();
> 221 if (appending)
> 222 position(size());
> 223 do {
> 224 n = IOUtil.write(fd, src, -1, nd, positionLock);
> 225 } while ((n == IOStatus.INTERRUPTED) && isOpen());
> 226 return IOStatus.normalize(n);
> 227 } finally {
> 228 threads.remove(ti);
> 229 end(n > 0);
> 230 assert IOStatus.check(n);
> 231 }
> 232 }
> 233 }
> {code}
> {code}
> IOUtil.java
> static int write(FileDescriptor fd, ByteBuffer src, long position,
> 74 NativeDispatcher nd, Object lock)
> 75 throws IOException
> 76 {
> 77 if (src instanceof DirectBuffer)
> 78 return writeFromNativeBuffer(fd, src, position, nd, lock);
> 79
> 80 // Substitute a native buffer
> 81 int pos = src.position();
> 82 int lim = src.limit();
> 83 assert (pos <= lim);
> 84 int rem = (pos <= lim ? lim - pos : 0);
> 85 ByteBuffer bb = null;
> 86 try {
> 87 bb = Util.getTemporaryDirectBuffer(rem);
> 88 bb.put(src);
> 89 bb.flip();
> 90 // Do not update src until we see how many bytes were written
> 91 src.position(pos);
> 92
> 93 int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
> 94 if (n > 0) {
> 95 // now update src
> 96 src.position(pos + n);
> 97 }
> 98 return n;
> 99 } finally {
> 100 Util.releaseTemporaryDirectBuffer(bb);
> 101 }
> 102 }
> {code}
> {code}
> Util.java
> static ByteBuffer getTemporaryDirectBuffer(int size) {
> 61 ByteBuffer buf = null;
> 62 // Grab a buffer if available
> 63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 64 SoftReference ref = (SoftReference)(bufferPool[i].get());
> 65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
> 66 (buf.capacity() >= size)) {
> 67 buf.rewind();
> 68 buf.limit(size);
> 69 bufferPool[i].set(null);
> 70 return buf;
> 71 }
> 72 }
> 73
> 74 // Make a new one
> 75 return ByteBuffer.allocateDirect(size);
> 76 }
> {code}
> {code}
> private static final int TEMP_BUF_POOL_SIZE = 3;
> 50
> 51 // Per-thread soft cache of the last temporary direct buffer
> 52 private static ThreadLocal[] bufferPool;
> 53
> 54 static {
> 55 bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
> 56 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
> 57 bufferPool[i] = new ThreadLocal();
> 58 }
> 59
> 60 static ByteBuffer getTemporaryDirectBuffer(int size) {
> 61 ByteBuffer buf = null;
> 62 // Grab a buffer if available
> 63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 64 SoftReference ref = (SoftReference)(bufferPool[i].get());
> 65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
> 66 (buf.capacity() >= size)) {
> 67 buf.rewind();
> 68 buf.limit(size);
> 69 bufferPool[i].set(null);
> 70 return buf;
> 71 }
> 72 }
> 73
> 74 // Make a new one
> 75 return ByteBuffer.allocateDirect(size);
> 76 }
> 77
> 78 static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
> 79 if (buf == null)
> 80 return;
> 81 // Put it in an empty slot if such exists
> 82 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 83 SoftReference ref = (SoftReference)(bufferPool[i].get());
> 84 if ((ref == null) || (ref.get() == null)) {
> 85 bufferPool[i].set(new SoftReference(buf));
> 86 return;
> 87 }
> 88 }
> 89 // Otherwise replace a smaller one in the cache if such exists
> 90 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
> 91 SoftReference ref = (SoftReference)(bufferPool[i].get());
> 92 ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
> 93 if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
> 94 bufferPool[i].set(new SoftReference(buf));
> 95 return;
> 96 }
> 97 }
> 98 }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org