You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "SuYan (JIRA)" <ji...@apache.org> on 2015/06/02 15:55:18 UTC
[jira] [Created] (SPARK-8044) Invoid use directMemory while put or
get block from file
SuYan created SPARK-8044:
----------------------------
Summary: Invoid use directMemory while put or get block from file
Key: SPARK-8044
URL: https://issues.apache.org/jira/browse/SPARK-8044
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 1.3.1
Reporter: SuYan
Priority: Critical
1. I found if we use getChannel to put or get data, it will create DirectBuffer anyway, which is not controllable.
according openJDK source code: because it will create a ThreadLocal directBuffer pool, and is not provider a 100% percent way to sure the direct buffer to be released.
{code}
sun.nio.ch.FileChannelImpl.java
public int write(ByteBuffer src) throws IOException {
210 ensureOpen();
211 if (!writable)
212 throw new NonWritableChannelException();
213 synchronized (positionLock) {
214 int n = 0;
215 int ti = -1;
216 try {
217 begin();
218 if (!isOpen())
219 return 0;
220 ti = threads.add();
221 if (appending)
222 position(size());
223 do {
224 n = IOUtil.write(fd, src, -1, nd, positionLock);
225 } while ((n == IOStatus.INTERRUPTED) && isOpen());
226 return IOStatus.normalize(n);
227 } finally {
228 threads.remove(ti);
229 end(n > 0);
230 assert IOStatus.check(n);
231 }
232 }
233 }
{code}
{code}
IOUtil.java
static int write(FileDescriptor fd, ByteBuffer src, long position,
74 NativeDispatcher nd, Object lock)
75 throws IOException
76 {
77 if (src instanceof DirectBuffer)
78 return writeFromNativeBuffer(fd, src, position, nd, lock);
79
80 // Substitute a native buffer
81 int pos = src.position();
82 int lim = src.limit();
83 assert (pos <= lim);
84 int rem = (pos <= lim ? lim - pos : 0);
85 ByteBuffer bb = null;
86 try {
87 bb = Util.getTemporaryDirectBuffer(rem);
88 bb.put(src);
89 bb.flip();
90 // Do not update src until we see how many bytes were written
91 src.position(pos);
92
93 int n = writeFromNativeBuffer(fd, bb, position, nd, lock);
94 if (n > 0) {
95 // now update src
96 src.position(pos + n);
97 }
98 return n;
99 } finally {
100 Util.releaseTemporaryDirectBuffer(bb);
101 }
102 }
{code}
{code}
Util.java
static ByteBuffer getTemporaryDirectBuffer(int size) {
61 ByteBuffer buf = null;
62 // Grab a buffer if available
63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64 SoftReference ref = (SoftReference)(bufferPool[i].get());
65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66 (buf.capacity() >= size)) {
67 buf.rewind();
68 buf.limit(size);
69 bufferPool[i].set(null);
70 return buf;
71 }
72 }
73
74 // Make a new one
75 return ByteBuffer.allocateDirect(size);
76 }
{code}
{code}
private static final int TEMP_BUF_POOL_SIZE = 3;
50
51 // Per-thread soft cache of the last temporary direct buffer
52 private static ThreadLocal[] bufferPool;
53
54 static {
55 bufferPool = new ThreadLocal[TEMP_BUF_POOL_SIZE];
56 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++)
57 bufferPool[i] = new ThreadLocal();
58 }
59
60 static ByteBuffer getTemporaryDirectBuffer(int size) {
61 ByteBuffer buf = null;
62 // Grab a buffer if available
63 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
64 SoftReference ref = (SoftReference)(bufferPool[i].get());
65 if ((ref != null) && ((buf = (ByteBuffer)ref.get()) != null) &&
66 (buf.capacity() >= size)) {
67 buf.rewind();
68 buf.limit(size);
69 bufferPool[i].set(null);
70 return buf;
71 }
72 }
73
74 // Make a new one
75 return ByteBuffer.allocateDirect(size);
76 }
77
78 static void releaseTemporaryDirectBuffer(ByteBuffer buf) {
79 if (buf == null)
80 return;
81 // Put it in an empty slot if such exists
82 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
83 SoftReference ref = (SoftReference)(bufferPool[i].get());
84 if ((ref == null) || (ref.get() == null)) {
85 bufferPool[i].set(new SoftReference(buf));
86 return;
87 }
88 }
89 // Otherwise replace a smaller one in the cache if such exists
90 for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
91 SoftReference ref = (SoftReference)(bufferPool[i].get());
92 ByteBuffer inCacheBuf = (ByteBuffer)ref.get();
93 if ((inCacheBuf == null) || (buf.capacity() > inCacheBuf.capacity())) {
94 bufferPool[i].set(new SoftReference(buf));
95 return;
96 }
97 }
98 }
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org