You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2016/06/06 07:36:38 UTC
[05/13] ignite git commit: IGNITE-3228: Hadoop: workaround/fix for
inefficient memory usage.
IGNITE-3228: Hadoop: workaround/fix for inefficient memory usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ea0598c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ea0598c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ea0598c
Branch: refs/heads/master
Commit: 8ea0598c6b5ce9e81ccd114df7246ba17e26817e
Parents: 91862c7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Jun 2 09:11:09 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Sun Jun 5 21:14:44 2016 +0300
----------------------------------------------------------------------
.../shuffle/collections/HadoopMultimapBase.java | 90 +++++++++++++++++---
1 file changed, 76 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ea0598c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
index e6995ca..7dcff3d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.jetbrains.annotations.Nullable;
@@ -48,7 +47,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
protected final int pageSize;
/** */
- private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>();
+ private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
/**
* @param jobInfo Job info.
@@ -64,11 +63,12 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
}
/**
- * @param ptrs Page pointers.
+ * @param page Page.
*/
- private void deallocate(GridLongList ptrs) {
- while (!ptrs.isEmpty())
- mem.release(ptrs.remove(), ptrs.remove());
+ private void deallocate(Page page) {
+ assert page != null;
+
+ mem.release(page.ptr, page.size);
}
/**
@@ -105,8 +105,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
/** {@inheritDoc} */
@Override public void close() {
- for (GridLongList list : allPages)
- deallocate(list);
+ for (Page page : allPages)
+ deallocate(page);
}
/**
@@ -190,8 +190,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
/** */
private long writeStart;
- /** Size and pointer pairs list. */
- private final GridLongList pages = new GridLongList(16);
+ /** Current page. */
+ private Page curPage;
/**
* @param ctx Task context.
@@ -222,11 +222,10 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
private long allocateNextPage(long requestedSize) {
int writtenSize = writtenSize();
- long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
+ long newPageSize = nextPageSize(writtenSize + requestedSize);
long newPagePtr = mem.allocate(newPageSize);
- pages.add(newPageSize);
- pages.add(newPagePtr);
+ System.out.println("ALLOCATED: " + newPageSize);
HadoopOffheapBuffer b = out.buffer();
@@ -240,10 +239,50 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
writeStart = newPagePtr;
+ // At this point old page is not needed, so we release it.
+ Page oldPage = curPage;
+
+ curPage = new Page(newPagePtr, newPageSize);
+
+ if (oldPage != null)
+ allPages.add(oldPage);
+
return b.move(requestedSize);
}
/**
+ * Get next page size.
+ *
+ * @param required Required amount of data.
+ * @return Next page size.
+ */
+ private long nextPageSize(long required) {
+ long pages = (required / pageSize) + 1;
+
+ long pagesPow2 = nextPowerOfTwo(pages);
+
+ return pagesPow2 * pageSize;
+ }
+
+ /**
+ * Get next power of two which greater or equal to the given number. Naive implementation.
+ *
+ * @param val Number
+ * @return Nearest pow2.
+ */
+ private long nextPowerOfTwo(long val) {
+ long res = 1;
+
+ while (res < val)
+ res = res << 1;
+
+ if (res < 0)
+ throw new IllegalArgumentException("Value is too big to find positive pow2: " + val);
+
+ return res;
+ }
+
+ /**
* @return Fixed pointer.
*/
private long fixAlignment() {
@@ -317,7 +356,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
- allPages.add(pages);
+ if (curPage != null)
+ allPages.add(curPage);
keySer.close();
valSer.close();
@@ -372,4 +412,26 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
throw new UnsupportedOperationException();
}
}
+
+ /**
+ * Page.
+ */
+ private static class Page {
+ /** Pointer. */
+ private final long ptr;
+
+ /** Size. */
+ private final long size;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param size Size.
+ */
+ public Page(long ptr, long size) {
+ this.ptr = ptr;
+ this.size = size;
+ }
+ }
}
\ No newline at end of file