You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/02/06 21:23:15 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5123 Avoid using
MappedByteBuffers for server side GROUP BY.
This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 2f985f4 PHOENIX-5123 Avoid using MappedByteBuffers for server side GROUP BY.
2f985f4 is described below
commit 2f985f4839370915f12e440ffffd57bf8ca22866
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Wed Feb 6 13:23:41 2019 -0800
PHOENIX-5123 Avoid using MappedByteBuffers for server side GROUP BY.
---
.../apache/phoenix/cache/aggcache/SpillFile.java | 78 +++++++++---------
.../apache/phoenix/cache/aggcache/SpillMap.java | 94 ++++++++++++----------
2 files changed, 88 insertions(+), 84 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
index 51aef98..a47cfdf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
@@ -51,27 +51,23 @@ public class SpillFile implements Closeable {
private Map<Integer, TempFile> tempFiles;
// Custom spill files directory
private File spillFilesDirectory = null;
-
+
// Wrapper class for a TempFile: File + RandomAccessFile
- private static class TempFile implements Closeable{
- private RandomAccessFile rndFile;
- private File file;
-
- public TempFile(File file, RandomAccessFile rndFile) {
- this.file = file;
- this.rndFile = rndFile;
- }
-
- public FileChannel getChannel() {
- return rndFile.getChannel();
- }
+ private static class TempFile implements Closeable {
+ private final RandomAccessFile rndFile;
+ private final File file;
+
+ public TempFile(File file, RandomAccessFile rndFile) {
+ this.file = file;
+ this.rndFile = rndFile;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.closeQuietly(rndFile.getChannel());
+ Closeables.closeQuietly(rndFile);
- @Override
- public void close() throws IOException {
- Closeables.closeQuietly(rndFile.getChannel());
- Closeables.closeQuietly(rndFile);
-
- if (file != null) {
+ if (file != null) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting tempFile: " + file.getAbsolutePath());
}
@@ -79,9 +75,9 @@ public class SpillFile implements Closeable {
file.delete();
} catch (SecurityException e) {
logger.warn("IOException thrown while closing Closeable." + e);
- }
+ }
}
- }
+ }
}
private SpillFile(File spillFilesDirectory) throws IOException {
@@ -120,29 +116,29 @@ public class SpillFile implements Closeable {
/**
* Random access to a page of the current spill file
* @param index
+ * @return a file seeked to the correct page
*/
- public MappedByteBuffer getPage(int index) {
+ public RandomAccessFile getPage(int index) {
try {
- TempFile tempFile = null;
- int fileIndex = 0;
-
- long offset = (long) index * (long) DEFAULT_PAGE_SIZE;
- if(offset >= SPILL_FILE_SIZE) {
- // Offset exceeds the first SpillFile size
- // Get the index of the file that should contain the pageID
- fileIndex = (int)(offset / SPILL_FILE_SIZE);
- if(!tempFiles.containsKey(fileIndex)) {
- // Dynamically add new spillFiles if directory grows beyond
- // max page ID.
- tempFile = createTempFile();
- tempFiles.put(fileIndex, tempFile);
- }
- }
- tempFile = tempFiles.get(fileIndex);
- // Channel gets buffered in file object
- FileChannel fc = tempFile.getChannel();
+ TempFile tempFile = null;
+ int fileIndex = 0;
- return fc.map(MapMode.READ_WRITE, offset, DEFAULT_PAGE_SIZE);
+ long offset = (long) index * (long) DEFAULT_PAGE_SIZE;
+ if (offset >= SPILL_FILE_SIZE) {
+ // Offset exceeds the first SpillFile size
+ // Get the index of the file that should contain the pageID
+ fileIndex = (int) (offset / SPILL_FILE_SIZE);
+ if (!tempFiles.containsKey(fileIndex)) {
+ // Dynamically add new spillFiles if directory grows beyond
+ // max page ID.
+ tempFile = createTempFile();
+ tempFiles.put(fileIndex, tempFile);
+ }
+ }
+ tempFile = tempFiles.get(fileIndex);
+ RandomAccessFile file = tempFile.rndFile;
+ file.seek(offset);
+ return file;
} catch (IOException ioe) {
// Close resource
close();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
index bb4ce2e..cff1e44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
@@ -19,8 +19,8 @@
package org.apache.phoenix.cache.aggcache;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.BufferOverflowException;
-import java.nio.MappedByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -55,7 +55,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
private int curMapBufferIndex;
private SpillFile spillFile;
// Directory of hash buckets --> extendible hashing implementation
- private MappedByteBufferMap[] directory;
+ private FileMap[] directory;
private final SpillableGroupByCache.QueryCache cache;
public SpillMap(SpillFile file, int thresholdBytes, int estValueSize, SpillableGroupByCache.QueryCache cache)
@@ -67,11 +67,11 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
// Init the e-hashing directory structure
globalDepth = 1;
- directory = new MappedByteBufferMap[(1 << globalDepth)];
+ directory = new FileMap[(1 << globalDepth)];
for (int i = 0; i < directory.length; i++) {
// Create an empty bucket list
- directory[i] = new MappedByteBufferMap(i, this.thresholdBytes, pageInserts, file);
+ directory[i] = new FileMap(i, this.thresholdBytes, pageInserts, file);
directory[i].flushBuffer();
}
directory[0].pageIn();
@@ -93,7 +93,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
// for bucket splits
private void redistribute(int index, ImmutableBytesPtr keyNew, byte[] valueNew) {
// Get the respective bucket
- MappedByteBufferMap byteMap = directory[index];
+ FileMap byteMap = directory[index];
// Get the actual bucket index, that the directory index points to
int mappedIdx = byteMap.pageIndex;
@@ -119,8 +119,8 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
int b2Index = Math.max(index, tmpIndex);
// Create two new split buckets
- MappedByteBufferMap b1 = new MappedByteBufferMap(b1Index, thresholdBytes, pageInserts, spillFile);
- MappedByteBufferMap b2 = new MappedByteBufferMap(b2Index, thresholdBytes, pageInserts, spillFile);
+ FileMap b1 = new FileMap(b1Index, thresholdBytes, pageInserts, spillFile);
+ FileMap b2 = new FileMap(b2Index, thresholdBytes, pageInserts, spillFile);
// redistribute old elements into b1 and b2
for (Entry<ImmutableBytesPtr, byte[]> element : byteMap.pageMap.entrySet()) {
@@ -182,7 +182,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
Preconditions.checkArgument(newDirSize < Integer.MAX_VALUE);
// Double it!
- MappedByteBufferMap[] newDirectory = new MappedByteBufferMap[newDirSize];
+ FileMap[] newDirectory = new FileMap[newDirSize];
for (int i = 0; i < directory.length; i++) {
newDirectory[i] = directory[i];
newDirectory[i + directory.length] = directory[i];
@@ -212,12 +212,12 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
byte[] value = null;
int bucketIndex = getBucketIndex(ikey);
- MappedByteBufferMap byteMap = directory[bucketIndex];
+ FileMap byteMap = directory[bucketIndex];
// Decision based on bucket ID, not the directory ID due to the n:1 relationship
if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
// map not paged in
- MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+ FileMap curByteMap = directory[curMapBufferIndex];
// Use bloomFilter to check if key was spilled before
if (byteMap.containsKey(ikey.copyBytesIfNecessary())) {
@@ -240,10 +240,10 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
private byte[] getAlways(ImmutableBytesPtr key) {
byte[] value = null;
int bucketIndex = getBucketIndex(key);
- MappedByteBufferMap byteMap = directory[bucketIndex];
+ FileMap byteMap = directory[bucketIndex];
if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
- MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+ FileMap curByteMap = directory[curMapBufferIndex];
// ensure consistency and flush current memory page to disk
curByteMap.flushBuffer();
@@ -266,7 +266,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
// page in element and replace if present
byte[] spilledValue = getAlways(key);
- MappedByteBufferMap byteMap = directory[curMapBufferIndex];
+ FileMap byteMap = directory[curMapBufferIndex];
int index = curMapBufferIndex;
// TODO: We split buckets until the new element fits onto a
@@ -308,9 +308,9 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
* page for easy get() and update() calls on an individual key The class keeps track of the current size of the in
* memory page and handles flushing and paging in respectively
*/
- private static class MappedByteBufferMap {
- private SpillFile spillFile;
- private int pageIndex;
+ private static class FileMap {
+ private final SpillFile spillFile;
+ private final int pageIndex;
private final int thresholdBytes;
private long totalResultSize;
private boolean pagedIn;
@@ -323,7 +323,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
// Used to determine is an element was written to this page before or not
BloomFilter<byte[]> bFilter;
- public MappedByteBufferMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
+ public FileMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
this.spillFile = spillFile;
// size threshold of a page
this.thresholdBytes = thresholdBytes;
@@ -363,24 +363,33 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
}
// Flush the current page to the memory mapped byte buffer
- private void flushBuffer() throws BufferOverflowException {
+ private void flushBuffer() {
if (pagedIn) {
- MappedByteBuffer buffer;
// Only flush if page was changed
if (dirtyPage) {
Collection<byte[]> values = pageMap.values();
- buffer = spillFile.getPage(pageIndex);
- buffer.clear();
+ RandomAccessFile file = spillFile.getPage(pageIndex);
// number of elements
- buffer.putInt(values.size());
- for (byte[] value : values) {
- // element length
- buffer.putInt(value.length);
- // element
- buffer.put(value, 0, value.length);
+ try {
+ file.writeInt(values.size());
+ int written = Bytes.SIZEOF_INT;
+ for (byte[] value : values) {
+ written += Bytes.SIZEOF_INT + value.length;
+ // safety check
+ if (written > SpillFile.DEFAULT_PAGE_SIZE) {
+ throw new BufferOverflowException();
+ }
+ // element length
+ file.writeInt(value.length);
+ // element
+ file.write(value, 0, value.length);
+ }
+ } catch (IOException ioe) {
+ // Error during key access on spilled resource
+ // TODO rework error handling
+ throw new RuntimeException(ioe);
}
}
- buffer = null;
// Reset page stats
pageMap.clear();
totalResultSize = 0;
@@ -389,24 +398,23 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
dirtyPage = false;
}
- // load memory mapped region into a map for fast element access
- private void pageIn() throws IndexOutOfBoundsException {
+ // load a page into a map for fast element access
+ private void pageIn() {
if (!pagedIn) {
- // Map the memory region
- MappedByteBuffer buffer = spillFile.getPage(pageIndex);
- int numElements = buffer.getInt();
+ RandomAccessFile file = spillFile.getPage(pageIndex);
+ try {
+ int numElements = file.readInt();
for (int i = 0; i < numElements; i++) {
- int kvSize = buffer.getInt();
+ int kvSize = file.readInt();
byte[] data = new byte[kvSize];
- buffer.get(data, 0, kvSize);
- try {
- pageMap.put(SpillManager.getKey(data), data);
- totalResultSize += (data.length + Bytes.SIZEOF_INT);
- } catch (IOException ioe) {
- // Error during key access on spilled resource
- // TODO rework error handling
- throw new RuntimeException(ioe);
- }
+ file.readFully(data);
+ pageMap.put(SpillManager.getKey(data), data);
+ totalResultSize += (data.length + Bytes.SIZEOF_INT);
+ }
+ } catch (IOException ioe) {
+ // Error during key access on spilled resource
+ // TODO rework error handling
+ throw new RuntimeException(ioe);
}
pagedIn = true;
dirtyPage = false;