You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2019/02/05 04:37:43 UTC
[phoenix] branch master updated: PHOENIX-5120 Avoid using
MappedByteBuffers for server side sorting.
This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4f396c6 PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting.
4f396c6 is described below
commit 4f396c605e934f13a8265249c280e439f4e924e0
Author: Lars Hofhansl <la...@apache.org>
AuthorDate: Mon Feb 4 20:38:02 2019 -0800
PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting.
---
.../phoenix/end2end/OrderByWithSpillingIT.java | 36 ++++++
.../apache/phoenix/execute/SortMergeJoinPlan.java | 46 ++++----
...ppedByteBufferQueue.java => BufferedQueue.java} | 127 ++++++++-------------
...erSortedQueue.java => BufferedSortedQueue.java} | 55 ++++-----
.../phoenix/iterate/OrderedResultIterator.java | 2 +-
5 files changed, 136 insertions(+), 130 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
new file mode 100644
index 0000000..c5eeaff
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Map;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class OrderByWithSpillingIT extends OrderByIT {
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ // do lot's of spooling!
+ props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index b23db35..aacea23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -20,8 +20,9 @@ package org.apache.phoenix.execute;
import static org.apache.phoenix.util.NumberUtil.add;
import static org.apache.phoenix.util.NumberUtil.getMin;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
import java.util.Collections;
@@ -50,7 +51,7 @@ import org.apache.phoenix.execute.visitor.ByteCountVisitor;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
-import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.BufferedQueue;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
@@ -293,7 +294,7 @@ public class SortMergeJoinPlan implements QueryPlan {
private ValueBitSet lhsBitSet;
private ValueBitSet rhsBitSet;
private byte[] emptyProjectedValue;
- private MappedByteBufferTupleQueue queue;
+ private BufferedTupleQueue queue;
private Iterator<Tuple> queueIterator;
public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
@@ -315,7 +316,7 @@ public class SortMergeJoinPlan implements QueryPlan {
int len = lhsBitSet.getEstimatedLength();
this.emptyProjectedValue = new byte[len];
lhsBitSet.toBytes(emptyProjectedValue, 0);
- this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
+ this.queue = new BufferedTupleQueue(thresholdBytes);
this.queueIterator = null;
}
@@ -609,24 +610,24 @@ public class SortMergeJoinPlan implements QueryPlan {
}
}
- private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> {
+ private static class BufferedTupleQueue extends BufferedQueue<Tuple> {
- public MappedByteBufferTupleQueue(int thresholdBytes) {
+ public BufferedTupleQueue(int thresholdBytes) {
super(thresholdBytes);
}
@Override
- protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue(
+ protected BufferedSegmentQueue<Tuple> createSegmentQueue(
int index, int thresholdBytes) {
- return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false);
+ return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
}
@Override
- protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
- return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() {
+ protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() {
+ return new Comparator<BufferedSegmentQueue<Tuple>>() {
@Override
- public int compare(MappedByteBufferSegmentQueue<Tuple> q1,
- MappedByteBufferSegmentQueue<Tuple> q2) {
+ public int compare(BufferedSegmentQueue<Tuple> q1,
+ BufferedSegmentQueue<Tuple> q2) {
return q1.index() - q2.index();
}
};
@@ -635,7 +636,7 @@ public class SortMergeJoinPlan implements QueryPlan {
@Override
public Iterator<Tuple> iterator() {
return new Iterator<Tuple>() {
- private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter;
+ private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
private Iterator<Tuple> currentIter;
{
this.queueIter = getSegmentQueues().iterator();
@@ -668,10 +669,10 @@ public class SortMergeJoinPlan implements QueryPlan {
};
}
- private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> {
+ private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> {
private LinkedList<Tuple> results;
- public MappedByteBufferTupleSegmentQueue(int index,
+ public BufferedTupleSegmentQueue(int index,
int thresholdBytes, boolean hasMaxQueueSize) {
super(index, thresholdBytes, hasMaxQueueSize);
this.results = Lists.newLinkedList();
@@ -688,23 +689,22 @@ public class SortMergeJoinPlan implements QueryPlan {
return Bytes.SIZEOF_INT * 2 + kv.getLength();
}
- @SuppressWarnings("deprecation")
@Override
- protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
+ protected void writeToStream(DataOutputStream out, Tuple e) throws IOException {
KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
- buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
- buffer.putInt(kv.getLength());
- buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
+ out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
+ out.writeInt(kv.getLength());
+ out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
}
@Override
- protected Tuple readFromBuffer(MappedByteBuffer buffer) {
- int length = buffer.getInt();
+ protected Tuple readFromStream(DataInputStream in) throws IOException {
+ int length = in.readInt();
if (length < 0)
return null;
byte[] b = new byte[length];
- buffer.get(b);
+ in.read(b);
Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
return new ResultTuple(result);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
similarity index 70%
rename from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
rename to phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
index 135ab26..6f6c523 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
@@ -17,13 +17,15 @@
*/
package org.apache.phoenix.iterate;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
import java.util.AbstractQueue;
import java.util.Comparator;
import java.util.Iterator;
@@ -34,26 +36,26 @@ import java.util.UUID;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
-public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
+public abstract class BufferedQueue<T> extends AbstractQueue<T> {
private final int thresholdBytes;
- private List<MappedByteBufferSegmentQueue<T>> queues;
+ private List<BufferedSegmentQueue<T>> queues;
private int currentIndex;
- private MappedByteBufferSegmentQueue<T> currentQueue;
- private MinMaxPriorityQueue<MappedByteBufferSegmentQueue<T>> mergedQueue;
+ private BufferedSegmentQueue<T> currentQueue;
+ private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue;
- public MappedByteBufferQueue(int thresholdBytes) {
+ public BufferedQueue(int thresholdBytes) {
this.thresholdBytes = thresholdBytes;
- this.queues = Lists.<MappedByteBufferSegmentQueue<T>> newArrayList();
+ this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList();
this.currentIndex = -1;
this.currentQueue = null;
this.mergedQueue = null;
}
- abstract protected MappedByteBufferSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
+ abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
- abstract protected Comparator<MappedByteBufferSegmentQueue<T>> getSegmentQueueComparator();
+ abstract protected Comparator<BufferedSegmentQueue<T>> getSegmentQueueComparator();
- protected final List<MappedByteBufferSegmentQueue<T>> getSegmentQueues() {
+ protected final List<BufferedSegmentQueue<T>> getSegmentQueues() {
return queues.subList(0, currentIndex + 1);
}
@@ -77,7 +79,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
public T poll() {
initMergedQueue();
if (mergedQueue != null && !mergedQueue.isEmpty()) {
- MappedByteBufferSegmentQueue<T> queue = mergedQueue.poll();
+ BufferedSegmentQueue<T> queue = mergedQueue.poll();
T re = queue.poll();
if (queue.peek() != null) {
mergedQueue.add(queue);
@@ -98,7 +100,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
@Override
public void clear() {
- for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
queue.clear();
}
currentIndex = -1;
@@ -114,7 +116,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
@Override
public int size() {
int size = 0;
- for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
size += queue.size();
}
return size;
@@ -125,7 +127,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
}
public void close() {
- for (MappedByteBufferSegmentQueue<T> queue : queues) {
+ for (BufferedSegmentQueue<T> queue : queues) {
queue.close();
}
queues.clear();
@@ -133,9 +135,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
private void initMergedQueue() {
if (mergedQueue == null && currentIndex >= 0) {
- mergedQueue = MinMaxPriorityQueue.<MappedByteBufferSegmentQueue<T>> orderedBy(
+ mergedQueue = MinMaxPriorityQueue.<BufferedSegmentQueue<T>> orderedBy(
getSegmentQueueComparator()).maximumSize(currentIndex + 1).create();
- for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+ for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
T re = queue.peek();
if (re != null) {
mergedQueue.add(queue);
@@ -144,17 +146,14 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
}
}
- public abstract static class MappedByteBufferSegmentQueue<T> extends AbstractQueue<T> {
+ public abstract static class BufferedSegmentQueue<T> extends AbstractQueue<T> {
protected static final int EOF = -1;
- // at least create 128 KB MappedByteBuffers
- private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
private final int index;
private final int thresholdBytes;
private final boolean hasMaxQueueSize;
private long totalResultSize = 0;
private int maxResultSize = 0;
- private long mappingSize = 0;
private File file;
private boolean isClosed = false;
private boolean flushBuffer = false;
@@ -164,7 +163,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
// iterators to close on close()
private List<SegmentQueueFileIterator> iterators;
- public MappedByteBufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
+ public BufferedSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
this.index = index;
this.thresholdBytes = thresholdBytes;
this.hasMaxQueueSize = hasMaxQueueSize;
@@ -173,8 +172,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
abstract protected Queue<T> getInMemoryQueue();
abstract protected int sizeOf(T e);
- abstract protected void writeToBuffer(MappedByteBuffer buffer, T e);
- abstract protected T readFromBuffer(MappedByteBuffer buffer);
+ abstract protected void writeToStream(DataOutputStream out, T e) throws IOException;
+ abstract protected T readFromStream(DataInputStream in) throws IOException;
public int index() {
return this.index;
@@ -253,7 +252,6 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
getInMemoryQueue().clear();
this.totalResultSize = 0;
this.maxResultSize = 0;
- this.mappingSize = 0;
this.flushBuffer = false;
this.flushedCount = 0;
this.current = null;
@@ -303,38 +301,25 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
if (totalResultSize >= thresholdBytes) {
this.file = File.createTempFile(UUID.randomUUID().toString(), null);
- RandomAccessFile af = new RandomAccessFile(file, "rw");
- FileChannel fc = af.getChannel();
- int writeIndex = 0;
- mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
- MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
-
- int resSize = inMemQueue.size();
- for (int i = 0; i < resSize; i++) {
- T e = inMemQueue.poll();
- writeToBuffer(writeBuffer, e);
- // buffer close to exhausted, re-map.
- if (mappingSize - writeBuffer.position() < maxResultSize) {
- writeIndex += writeBuffer.position();
- writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+ try (DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file)))) {
+ int resSize = inMemQueue.size();
+ for (int i = 0; i < resSize; i++) {
+ T e = inMemQueue.poll();
+ writeToStream(out, e);
}
+ out.writeInt(EOF); // end
+ flushedCount = resSize;
+ inMemQueue.clear();
+ flushBuffer = true;
}
- writeBuffer.putInt(EOF); // end
- fc.force(true);
- fc.close();
- af.close();
- flushedCount = resSize;
- inMemQueue.clear();
- flushBuffer = true;
}
}
private class SegmentQueueFileIterator implements Iterator<T>, Closeable {
private boolean isEnd;
private long readIndex;
- private RandomAccessFile af;
- private FileChannel fc;
- private MappedByteBuffer readBuffer;
+ private DataInputStream in;
private T next;
public SegmentQueueFileIterator() {
@@ -354,9 +339,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
this.readIndex = readIndex;
this.next = null;
try {
- this.af = new RandomAccessFile(file, "r");
- this.fc = af.getChannel();
- this.readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+ this.in = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file)));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -384,23 +368,17 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
private T readNext() {
if (isEnd)
return null;
-
- T e = readFromBuffer(readBuffer);
+
+ T e = null;
+ try {
+ e = readFromStream(in);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
if (e == null) {
close();
return null;
}
-
- // buffer close to exhausted, re-map.
- if (mappingSize - readBuffer.position() < maxResultSize) {
- readIndex += readBuffer.position();
- try {
- readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
return e;
}
@@ -412,18 +390,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
@Override
public void close() {
this.isEnd = true;
- if (this.fc != null) {
- try {
- this.fc.close();
- } catch (IOException ignored) {
- }
- }
- if (this.af != null) {
- try {
- this.af.close();
- } catch (IOException ignored) {
- }
- this.af = null;
+ try {
+ this.in.close();
+ } catch (IOException ignored) {
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
similarity index 74%
rename from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
rename to phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
index 529a0c5..a54bb44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
@@ -17,8 +17,9 @@
*/
package org.apache.phoenix.iterate;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -36,11 +37,11 @@ import org.apache.phoenix.util.ResultUtil;
import com.google.common.collect.MinMaxPriorityQueue;
-public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEntry> {
+public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
private Comparator<ResultEntry> comparator;
private final int limit;
- public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
+ public BufferedSortedQueue(Comparator<ResultEntry> comparator,
Integer limit, int thresholdBytes) throws IOException {
super(thresholdBytes);
this.comparator = comparator;
@@ -48,25 +49,25 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
}
@Override
- protected org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry> createSegmentQueue(
+ protected BufferedSegmentQueue<ResultEntry> createSegmentQueue(
int index, int thresholdBytes) {
- return new MappedByteBufferResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
+ return new BufferedResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
}
@Override
- protected Comparator<org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
- return new Comparator<MappedByteBufferSegmentQueue<ResultEntry>>() {
+ protected Comparator<BufferedSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
+ return new Comparator<BufferedSegmentQueue<ResultEntry>>() {
@Override
- public int compare(MappedByteBufferSegmentQueue<ResultEntry> q1,
- MappedByteBufferSegmentQueue<ResultEntry> q2) {
+ public int compare(BufferedSegmentQueue<ResultEntry> q1,
+ BufferedSegmentQueue<ResultEntry> q2) {
return comparator.compare(q1.peek(), q2.peek());
}};
}
- private static class MappedByteBufferResultEntryPriorityQueue extends MappedByteBufferSegmentQueue<ResultEntry> {
+ private static class BufferedResultEntryPriorityQueue extends BufferedSegmentQueue<ResultEntry> {
private MinMaxPriorityQueue<ResultEntry> results = null;
- public MappedByteBufferResultEntryPriorityQueue(int index,
+ public BufferedResultEntryPriorityQueue(int index,
int thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
super(index, thresholdBytes, limit >= 0);
this.results = limit < 0 ?
@@ -85,54 +86,54 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
}
@Override
- protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) {
+ protected void writeToStream(DataOutputStream os, ResultEntry e) throws IOException {
int totalLen = 0;
List<KeyValue> keyValues = toKeyValues(e);
for (KeyValue kv : keyValues) {
totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
}
- buffer.putInt(totalLen);
+ os.writeInt(totalLen);
for (KeyValue kv : keyValues) {
- buffer.putInt(kv.getLength());
- buffer.put(kv.getBuffer(), kv.getOffset(), kv
+ os.writeInt(kv.getLength());
+ os.write(kv.getBuffer(), kv.getOffset(), kv
.getLength());
}
ImmutableBytesWritable[] sortKeys = e.sortKeys;
- buffer.putInt(sortKeys.length);
+ os.writeInt(sortKeys.length);
for (ImmutableBytesWritable sortKey : sortKeys) {
if (sortKey != null) {
- buffer.putInt(sortKey.getLength());
- buffer.put(sortKey.get(), sortKey.getOffset(),
+ os.writeInt(sortKey.getLength());
+ os.write(sortKey.get(), sortKey.getOffset(),
sortKey.getLength());
} else {
- buffer.putInt(0);
+ os.writeInt(0);
}
}
}
@Override
- protected ResultEntry readFromBuffer(MappedByteBuffer buffer) {
- int length = buffer.getInt();
+ protected ResultEntry readFromStream(DataInputStream is) throws IOException {
+ int length = is.readInt();
if (length < 0)
return null;
-
+
byte[] rb = new byte[length];
- buffer.get(rb);
+ is.read(rb);
Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
ResultTuple rt = new ResultTuple(result);
- int sortKeySize = buffer.getInt();
+ int sortKeySize = is.readInt();
ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
for (int i = 0; i < sortKeySize; i++) {
- int contentLength = buffer.getInt();
+ int contentLength = is.readInt();
if (contentLength > 0) {
byte[] sortKeyContent = new byte[contentLength];
- buffer.get(sortKeyContent);
+ is.read(sortKeyContent);
sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
} else {
sortKeys[i] = null;
}
}
-
+
return new ResultEntry(sortKeys, rt);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 36b274a..22712ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -208,7 +208,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
try{
- final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit,
+ final BufferedSortedQueue queueEntries = new BufferedSortedQueue(comparator, limit,
thresholdBytes);
resultIterator = new PeekingResultIterator() {
int count = 0;