You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wa...@apache.org on 2018/02/19 17:07:50 UTC
[3/7] asterixdb git commit: [ASTERIXDB-2083][COMP][RT][IDX][SITE]
Budget-Constrained Inverted index search
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java
index 79033d2..23854f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAccessor.java
@@ -25,6 +25,11 @@ import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+/**
+ * This is a fixed-size tuple accessor class.
+ * The frame structure: [4 bytes for minimum Hyracks frame count] [fixed-size tuple 1] ... [fixed-size tuple n] ...
+ * [4 bytes for the tuple count in a frame]
+ */
public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor {
private final int frameSize;
@@ -82,12 +87,12 @@ public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor {
@Override
public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return tupleIndex * tupleSize + fieldStartOffsets[fIdx];
+ return getTupleStartOffset(tupleIndex) + fieldStartOffsets[fIdx];
}
@Override
public int getTupleCount() {
- return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ return buffer != null ? buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)) : 0;
}
@Override
@@ -97,7 +102,7 @@ public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor {
@Override
public int getTupleStartOffset(int tupleIndex) {
- return tupleIndex * tupleSize;
+ return FixedSizeFrameTupleAppender.MINFRAME_COUNT_SIZE + tupleIndex * tupleSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java
index 5dd23c4..85d8576 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeFrameTupleAppender.java
@@ -20,13 +20,25 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+/**
+ * An appender class for an inverted list. Each frame has two integer values at the beginning and at the end.
+ * The first represents the number of minimum Hyracks frames in a frame. Currently, we use 1 for this value.
+ * The latter represents the number of tuples in a frame. This design is required since we may need to use
+ * RunFileWriter and RunFileReader class during the inverted-index-search operation.
+ */
public class FixedSizeFrameTupleAppender {
- private static final int TUPLE_COUNT_SIZE = 4;
+ // At the end of a frame, an integer value is written to keep the tuple count in this frame.
+ public static final int TUPLE_COUNT_SIZE = 4;
+ // At the beginning of a frame, an integer value is written to keep the number of minimum frames in this frame.
+ // For this class, the frame size is equal to the minimum frame size in Hyracks.
+ public static final int MINFRAME_COUNT_SIZE = 4;
+
private final int frameSize;
private final int tupleSize;
private ByteBuffer buffer;
@@ -42,13 +54,22 @@ public class FixedSizeFrameTupleAppender {
tupleSize = tmp;
}
- public void reset(ByteBuffer buffer, boolean clear) {
+ public void reset(ByteBuffer buffer) {
+ reset(buffer, true, 0, MINFRAME_COUNT_SIZE);
+ }
+
+ public void reset(ByteBuffer buffer, boolean clear, int tupleCount, int tupleDataEndOffset) {
this.buffer = buffer;
if (clear) {
- buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
- tupleCount = 0;
- tupleDataEndOffset = 0;
+ Arrays.fill(this.buffer.array(), (byte) 0);
+ this.buffer.clear();
+ // the number of minimum frames in a frame - it's one.
+ FrameHelper.serializeFrameSize(this.buffer, 1);
}
+ // tuple count
+ this.buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ this.tupleCount = tupleCount;
+ this.tupleDataEndOffset = tupleDataEndOffset;
}
public boolean append(byte[] bytes, int offset) {
@@ -128,4 +149,5 @@ public class FixedSizeFrameTupleAppender {
public ByteBuffer getBuffer() {
return buffer;
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 8e8cb13..2f4f1d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -21,15 +21,14 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -48,7 +47,8 @@ import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexSearchCursorInitialState;
import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
import org.apache.hyracks.storage.am.lsm.invertedindex.search.TOccurrenceSearcher;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -70,8 +70,6 @@ import org.apache.hyracks.storage.common.file.BufferedFileHandle;
* cannot exceed the size of a Hyracks frame.
*/
public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
- protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
-
// Schema of BTree tuples, set in constructor.
protected final int invListStartPageIdField;
protected final int invListEndPageIdField;
@@ -185,12 +183,17 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
}
@Override
- public IInvertedListCursor createInvertedListCursor() {
- return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits);
+ public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new FixedSizeElementInvertedListCursor(bufferCache, fileId, invListTypeTraits, ctx);
}
@Override
- public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey,
+ public InvertedListCursor createInvertedListRangeSearchCursor() throws HyracksDataException {
+ return new FixedSizeElementInvertedListScanCursor(bufferCache, fileId, invListTypeTraits);
+ }
+
+ @Override
+ public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey,
IIndexOperationContext ictx) throws HyracksDataException {
OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
ctx.getBtreePred().setLowKeyComparator(ctx.getSearchCmp());
@@ -201,16 +204,19 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
try {
if (ctx.getBtreeCursor().hasNext()) {
ctx.getBtreeCursor().next();
- resetInvertedListCursor(ctx.getBtreeCursor().getTuple(), listCursor);
+ openInvertedListCursor(ctx.getBtreeCursor().getTuple(), listCursor);
} else {
- listCursor.reset(0, 0, 0, 0);
+ LSMInvertedIndexSearchCursorInitialState initState = new LSMInvertedIndexSearchCursorInitialState();
+ initState.setInvertedListInfo(0, 0, 0, 0);
+ listCursor.open(initState, null);
}
} finally {
ctx.getBtreeCursor().close();
}
}
- public void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+ public void openInvertedListCursor(ITupleReference btreeTuple, InvertedListCursor listCursor)
+ throws HyracksDataException {
int startPageId = IntegerPointable.getInteger(btreeTuple.getFieldData(invListStartPageIdField),
btreeTuple.getFieldStart(invListStartPageIdField));
int endPageId = IntegerPointable.getInteger(btreeTuple.getFieldData(invListEndPageIdField),
@@ -219,7 +225,9 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
btreeTuple.getFieldStart(invListStartOffField));
int numElements = IntegerPointable.getInteger(btreeTuple.getFieldData(invListNumElementsField),
btreeTuple.getFieldStart(invListNumElementsField));
- listCursor.reset(startPageId, endPageId, startOff, numElements);
+ LSMInvertedIndexSearchCursorInitialState initState = new LSMInvertedIndexSearchCursorInitialState();
+ initState.setInvertedListInfo(startPageId, endPageId, startOff, numElements);
+ listCursor.open(initState, null);
}
public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
@@ -416,45 +424,48 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
}
public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
- private final OnDiskInvertedIndex index;
- private final IInvertedIndexSearcher searcher;
- private final IIndexOperationContext opCtx = new OnDiskInvertedIndexOpContext(btree);
+ protected final OnDiskInvertedIndex index;
+ protected final IIndexOperationContext opCtx;
+ protected final IHyracksTaskContext ctx;
+ protected IInvertedIndexSearcher searcher;
private boolean destroyed = false;
- public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) throws HyracksDataException {
- this.index = index;
- this.searcher = new TOccurrenceSearcher(ctx, index);
- }
-
- // Let subclasses initialize.
- protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) {
+ public OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx)
+ throws HyracksDataException {
this.index = index;
- this.searcher = searcher;
+ this.ctx = ctx;
+ this.opCtx = new OnDiskInvertedIndexOpContext(btree);
}
@Override
- public IIndexCursor createSearchCursor(boolean exclusive) {
- return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
+ public IIndexCursor createSearchCursor(boolean exclusive) throws HyracksDataException {
+ if (searcher == null) {
+ searcher = new TOccurrenceSearcher(index, ctx);
+ }
+ return new OnDiskInvertedIndexSearchCursor(searcher);
}
@Override
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException {
- searcher.search((OnDiskInvertedIndexSearchCursor) cursor, (InvertedIndexSearchPredicate) searchPred, opCtx);
+ if (searcher == null) {
+ searcher = new TOccurrenceSearcher(index, ctx);
+ }
+ searcher.search(cursor, (InvertedIndexSearchPredicate) searchPred, opCtx);
}
@Override
- public IInvertedListCursor createInvertedListCursor() {
- return index.createInvertedListCursor();
+ public InvertedListCursor createInvertedListCursor() throws HyracksDataException {
+ return index.createInvertedListCursor(ctx);
}
@Override
- public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey)
+ public void openInvertedListCursor(InvertedListCursor listCursor, ITupleReference searchKey)
throws HyracksDataException {
index.openInvertedListCursor(listCursor, searchKey, opCtx);
}
@Override
- public IIndexCursor createRangeSearchCursor() {
+ public IIndexCursor createRangeSearchCursor() throws HyracksDataException {
return new OnDiskInvertedIndexRangeSearchCursor(index, opCtx);
}
@@ -496,47 +507,8 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
@Override
public OnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
- return new OnDiskInvertedIndexAccessor(this);
- }
-
- // This is just a dummy hyracks context for allocating frames for temporary
- // results during inverted index searches.
- // TODO: In the future we should use the real HyracksTaskContext to track
- // frame usage.
- public static class DefaultHyracksCommonContext implements IHyracksCommonContext {
- private final int FRAME_SIZE = 32768;
-
- @Override
- public int getInitialFrameSize() {
- return FRAME_SIZE;
- }
-
- @Override
- public IIOManager getIoManager() {
- return null;
- }
-
- @Override
- public ByteBuffer allocateFrame() {
- return ByteBuffer.allocate(FRAME_SIZE);
- }
-
- @Override
- public ByteBuffer allocateFrame(int bytes) throws HyracksDataException {
- return ByteBuffer.allocate(bytes);
- }
-
- @Override
- public ByteBuffer reallocateFrame(ByteBuffer bytes, int newSizeInBytes, boolean copyOldData)
- throws HyracksDataException {
- throw new HyracksDataException("TODO");
- }
-
- @Override
- public void deallocateFrames(int bytes) {
- // TODO Auto-generated method stub
-
- }
+ return new OnDiskInvertedIndexAccessor(this,
+ (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
}
@Override
@@ -576,19 +548,21 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
fieldPermutation[i] = i;
}
PermutingTupleReference tokenTuple = new PermutingTupleReference(fieldPermutation);
+ IIndexOperationContext opCtx = new OnDiskInvertedIndexOpContext(btree);
// Search key for finding an inverted-list in the actual index.
ArrayTupleBuilder prevBuilder = new ArrayTupleBuilder(invListTypeTraits.length);
ArrayTupleReference prevTuple = new ArrayTupleReference();
IInvertedIndexAccessor invIndexAccessor = createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- IInvertedListCursor invListCursor = invIndexAccessor.createInvertedListCursor();
+ InvertedListCursor invListCursor = createInvertedListRangeSearchCursor();
MultiComparator invListCmp = MultiComparator.create(invListCmpFactories);
while (btreeCursor.hasNext()) {
btreeCursor.next();
tokenTuple.reset(btreeCursor.getTuple());
// Validate inverted list by checking that the elements are totally ordered.
- invIndexAccessor.openInvertedListCursor(invListCursor, tokenTuple);
- invListCursor.pinPages();
+ openInvertedListCursor(invListCursor, tokenTuple, opCtx);
+ invListCursor.prepareLoadPages();
+ invListCursor.loadPages();
try {
if (invListCursor.hasNext()) {
invListCursor.next();
@@ -607,7 +581,8 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
prevTuple.reset(prevBuilder.getFieldEndOffsets(), prevBuilder.getByteArray());
}
} finally {
- invListCursor.unpinPages();
+ invListCursor.unloadPages();
+ invListCursor.close();
}
}
} finally {
@@ -664,4 +639,5 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex {
bufferCache.purgeHandle(fileId);
fileId = -1;
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
index 89d4e9a..267cc79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
@@ -39,7 +39,7 @@ public class OnDiskInvertedIndexOpContext implements IIndexOperationContext {
private MultiComparator prefixSearchCmp;
private boolean destroyed = false;
- public OnDiskInvertedIndexOpContext(BTree btree) {
+ public OnDiskInvertedIndexOpContext(BTree btree) throws HyracksDataException {
// TODO: Ignore opcallbacks for now.
btreeAccessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
btreeCursor = btreeAccessor.createSearchCursor(false);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
index 7af35ff..a33b045 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexRangeSearchCursor.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.IIndexAccessor;
@@ -43,8 +43,8 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
private final IIndexAccessor btreeAccessor;
private final IInPlaceInvertedIndex invIndex;
private final IIndexOperationContext opCtx;
- private final IInvertedListCursor invListCursor;
- private boolean unpinNeeded;
+ private final InvertedListCursor invListRangeSearchCursor;
+ private boolean isInvListCursorOpen;
private final IIndexCursor btreeCursor;
private RangePredicate btreePred;
@@ -52,7 +52,8 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
private final PermutingTupleReference tokenTuple;
private ConcatenatingTupleReference concatTuple;
- public OnDiskInvertedIndexRangeSearchCursor(IInPlaceInvertedIndex invIndex, IIndexOperationContext opCtx) {
+ public OnDiskInvertedIndexRangeSearchCursor(IInPlaceInvertedIndex invIndex, IIndexOperationContext opCtx)
+ throws HyracksDataException {
this.btree = ((OnDiskInvertedIndex) invIndex).getBTree();
this.btreeAccessor = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
this.invIndex = invIndex;
@@ -65,64 +66,59 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
tokenTuple = new PermutingTupleReference(fieldPermutation);
btreeCursor = btreeAccessor.createSearchCursor(false);
concatTuple = new ConcatenatingTupleReference(2);
- invListCursor = invIndex.createInvertedListCursor();
- unpinNeeded = false;
+ invListRangeSearchCursor = invIndex.createInvertedListRangeSearchCursor();
+ isInvListCursorOpen = false;
}
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
this.btreePred = (RangePredicate) searchPred;
btreeAccessor.search(btreeCursor, btreePred);
- invListCursor.pinPages();
- unpinNeeded = true;
+ openInvListRangeSearchCursor();
}
@Override
public boolean doHasNext() throws HyracksDataException {
- if (invListCursor.hasNext()) {
- return true;
- }
- if (unpinNeeded) {
- invListCursor.unpinPages();
- unpinNeeded = false;
- }
- if (!btreeCursor.hasNext()) {
+ // No more results possible
+ if (!isInvListCursorOpen) {
return false;
}
- btreeCursor.next();
- tokenTuple.reset(btreeCursor.getTuple());
- invIndex.openInvertedListCursor(invListCursor, tokenTuple, opCtx);
- invListCursor.pinPages();
- invListCursor.hasNext();
- unpinNeeded = true;
- concatTuple.reset();
- concatTuple.addTuple(tokenTuple);
- return true;
+ if (invListRangeSearchCursor.hasNext()) {
+ return true;
+ }
+ // The current inverted-list-range-search cursor is exhausted.
+ invListRangeSearchCursor.unloadPages();
+ invListRangeSearchCursor.close();
+ isInvListCursorOpen = false;
+ openInvListRangeSearchCursor();
+ return isInvListCursorOpen;
}
@Override
public void doNext() throws HyracksDataException {
- invListCursor.next();
+ invListRangeSearchCursor.next();
if (concatTuple.hasMaxTuples()) {
concatTuple.removeLastTuple();
}
- concatTuple.addTuple(invListCursor.getTuple());
+ concatTuple.addTuple(invListRangeSearchCursor.getTuple());
}
@Override
public void doDestroy() throws HyracksDataException {
- if (unpinNeeded) {
- invListCursor.unpinPages();
- unpinNeeded = false;
+ if (isInvListCursorOpen) {
+ invListRangeSearchCursor.unloadPages();
+ invListRangeSearchCursor.destroy();
+ isInvListCursorOpen = false;
}
btreeCursor.destroy();
}
@Override
public void doClose() throws HyracksDataException {
- if (unpinNeeded) {
- invListCursor.unpinPages();
- unpinNeeded = false;
+ if (isInvListCursorOpen) {
+ invListRangeSearchCursor.unloadPages();
+ invListRangeSearchCursor.close();
+ isInvListCursorOpen = false;
}
btreeCursor.close();
}
@@ -131,4 +127,20 @@ public class OnDiskInvertedIndexRangeSearchCursor extends EnforcedIndexCursor {
public ITupleReference doGetTuple() {
return concatTuple;
}
+
+ // Opens an inverted-list-scan cursor for the given tuple.
+ private void openInvListRangeSearchCursor() throws HyracksDataException {
+ if (btreeCursor.hasNext()) {
+ btreeCursor.next();
+ tokenTuple.reset(btreeCursor.getTuple());
+ invIndex.openInvertedListCursor(invListRangeSearchCursor, tokenTuple, opCtx);
+ invListRangeSearchCursor.prepareLoadPages();
+ invListRangeSearchCursor.loadPages();
+ concatTuple.reset();
+ concatTuple.addTuple(tokenTuple);
+ isInvListCursorOpen = true;
+ } else {
+ isInvListCursorOpen = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
index 0563ec9..4c521fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
@@ -19,94 +19,55 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
import org.apache.hyracks.storage.common.EnforcedIndexCursor;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+/**
+ * A search cursor that fetches the result from an IInvertedIndexSearcher instance.
+ */
public class OnDiskInvertedIndexSearchCursor extends EnforcedIndexCursor {
- private List<ByteBuffer> resultBuffers;
- private int numResultBuffers;
- private int currentBufferIndex = 0;
- private int tupleIndex = 0;
private final IInvertedIndexSearcher invIndexSearcher;
- private final IFrameTupleAccessor fta;
- private final FixedSizeTupleReference frameTuple;
- private final PermutingTupleReference resultTuple;
- public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher, int numInvListFields) {
+ public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher) {
this.invIndexSearcher = invIndexSearcher;
- this.fta = invIndexSearcher.createResultFrameTupleAccessor();
- this.frameTuple = (FixedSizeTupleReference) invIndexSearcher.createResultFrameTupleReference();
- // Project away the occurrence count from the result tuples.
- int[] fieldPermutation = new int[numInvListFields];
- for (int i = 0; i < numInvListFields; i++) {
- fieldPermutation[i] = i;
- }
- resultTuple = new PermutingTupleReference(fieldPermutation);
}
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
- currentBufferIndex = 0;
- tupleIndex = 0;
- resultBuffers = invIndexSearcher.getResultBuffers();
- numResultBuffers = invIndexSearcher.getNumValidResultBuffers();
- if (numResultBuffers > 0) {
- fta.reset(resultBuffers.get(0));
- }
+ // No-op for this cursor since all necessary information is already set in the given searcher.
+ // This class is just a wrapper.
}
@Override
- public boolean doHasNext() {
- if (currentBufferIndex < numResultBuffers && tupleIndex < fta.getTupleCount()) {
- return true;
- } else {
- return false;
- }
+ public boolean doHasNext() throws HyracksDataException {
+ return invIndexSearcher.hasNext();
}
@Override
- public void doNext() {
- frameTuple.reset(fta.getBuffer().array(), fta.getTupleStartOffset(tupleIndex));
- resultTuple.reset(frameTuple);
- tupleIndex++;
- if (tupleIndex >= fta.getTupleCount()) {
- if (currentBufferIndex + 1 < numResultBuffers) {
- currentBufferIndex++;
- fta.reset(resultBuffers.get(currentBufferIndex));
- tupleIndex = 0;
- }
- }
+ public void doNext() throws HyracksDataException {
+ invIndexSearcher.next();
}
@Override
public ITupleReference doGetTuple() {
- return resultTuple;
+ return invIndexSearcher.getTuple();
}
@Override
- public void doClose() {
- currentBufferIndex = 0;
- tupleIndex = 0;
- invIndexSearcher.reset();
- resultBuffers = invIndexSearcher.getResultBuffers();
- numResultBuffers = invIndexSearcher.getNumValidResultBuffers();
+ public void doClose() throws HyracksDataException {
+ doDestroy();
}
@Override
public void doDestroy() throws HyracksDataException {
- currentBufferIndex = 0;
- tupleIndex = 0;
- resultBuffers = null;
- numResultBuffers = 0;
+ if (invIndexSearcher != null) {
+ invIndexSearcher.destroy();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 064a26d..eff4f5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -19,23 +19,26 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.ondisk;
-import java.util.List;
-
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
import org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
import org.apache.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implements IPartitionedInvertedIndex {
@@ -51,21 +54,39 @@ public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implemen
}
public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
- public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) throws HyracksDataException {
- super(index, new PartitionedTOccurrenceSearcher(ctx, index));
+ public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ super(index, ctx);
+ }
+
+ @Override
+ public IIndexCursor createSearchCursor(boolean exclusive) throws HyracksDataException {
+ if (searcher == null) {
+ searcher = new PartitionedTOccurrenceSearcher(index, ctx);
+ }
+ return new OnDiskInvertedIndexSearchCursor(searcher);
+ }
+
+ @Override
+ public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException {
+ if (searcher == null) {
+ searcher = new PartitionedTOccurrenceSearcher(index, ctx);
+ }
+ searcher.search(cursor, (InvertedIndexSearchPredicate) searchPred, opCtx);
}
}
@Override
public PartitionedOnDiskInvertedIndexAccessor createAccessor(IIndexAccessParameters iap)
throws HyracksDataException {
- return new PartitionedOnDiskInvertedIndexAccessor(this);
+ return new PartitionedOnDiskInvertedIndexAccessor(this,
+ (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
}
@Override
public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
- short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
- List<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException {
+ short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
+ throws HyracksDataException {
PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
ITupleReference lowSearchKey = null;
@@ -95,9 +116,8 @@ public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implemen
ITupleReference btreeTuple = ctx.getBtreeCursor().getTuple();
short numTokens = ShortPointable.getShort(btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
- IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
- resetInvertedListCursor(btreeTuple, invListCursor);
- cursorsOrderedByTokens.add(invListCursor);
+ InvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
+ openInvertedListCursor(btreeTuple, invListCursor);
invListPartitions.addInvertedListCursor(invListCursor, numTokens);
tokenExists = true;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
index 294eb04..ff9a2f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
@@ -25,22 +25,25 @@ import java.util.List;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferManagerBackedVSizeFrame;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizer;
@@ -57,10 +60,12 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc
protected final int OBJECT_CACHE_INIT_SIZE = 10;
protected final int OBJECT_CACHE_EXPAND_SIZE = 10;
- protected final IHyracksCommonContext ctx;
+ protected final IHyracksTaskContext ctx;
protected final InvertedListMerger invListMerger;
- protected final SearchResult searchResult;
+ // Final search result is needed because multiple merge() calls can happen.
+ // We can't just use one of intermediate results as the final search result.
+ protected final InvertedIndexFinalSearchResult finalSearchResult;
protected final IInPlaceInvertedIndex invIndex;
protected final MultiComparator invListCmp;
@@ -71,28 +76,51 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc
protected int occurrenceThreshold;
- protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
- protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+ protected final IObjectFactory<InvertedListCursor> invListCursorFactory;
+ protected final ObjectCache<InvertedListCursor> invListCursorCache;
- public AbstractTOccurrenceSearcher(IHyracksCommonContext ctx, IInPlaceInvertedIndex invIndex)
+ protected final ISimpleFrameBufferManager bufferManager;
+ protected boolean isFinishedSearch;
+
+ // For a single inverted list case
+ protected InvertedListCursor singleInvListCursor;
+ protected boolean isSingleInvertedList;
+
+ // To read the final search result
+ protected ByteBuffer searchResultBuffer;
+ protected int searchResultTupleIndex = 0;
+ protected final IFrameTupleAccessor searchResultFta;
+ protected FixedSizeTupleReference searchResultTuple;
+
+ public AbstractTOccurrenceSearcher(IInPlaceInvertedIndex invIndex, IHyracksTaskContext ctx)
throws HyracksDataException {
- this.ctx = ctx;
- this.invListMerger = new InvertedListMerger(ctx, invIndex);
- this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
this.invIndex = invIndex;
+ this.ctx = ctx;
+ if (ctx == null) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_HYRACKS_TASK_IS_NULL);
+ }
+ this.bufferManager = TaskUtil.get(HyracksConstants.INVERTED_INDEX_SEARCH_FRAME_MANAGER, ctx);
+ if (bufferManager == null) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_CONTINUE_TEXT_SEARCH_BUFFER_MANAGER_IS_NULL);
+ }
+ this.finalSearchResult =
+ new InvertedIndexFinalSearchResult(invIndex.getInvListTypeTraits(), ctx, bufferManager);
+ this.invListMerger = new InvertedListMerger(ctx, invIndex, bufferManager);
this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
- this.invListCursorFactory = new InvertedListCursorFactory(invIndex);
+ this.invListCursorFactory = new InvertedListCursorFactory(invIndex, ctx);
this.invListCursorCache =
new ObjectCache<>(invListCursorFactory, OBJECT_CACHE_INIT_SIZE, OBJECT_CACHE_EXPAND_SIZE);
- this.queryTokenFrame = new VSizeFrame(ctx);
+ this.queryTokenFrame = new BufferManagerBackedVSizeFrame(ctx, bufferManager);
+ if (queryTokenFrame.getBuffer() == null) {
+ throw HyracksDataException.create(ErrorCode.NOT_ENOUGH_BUDGET_FOR_TEXTSEARCH,
+ this.getClass().getSimpleName());
+ }
this.queryTokenAppender = new FrameTupleAppenderAccessor(QUERY_TOKEN_REC_DESC);
this.queryTokenAppender.reset(queryTokenFrame, true);
- }
-
- @Override
- public void reset() {
- searchResult.clear();
- invListMerger.reset();
+ this.isSingleInvertedList = false;
+ this.searchResultTuple = new FixedSizeTupleReference(invIndex.getInvListTypeTraits());
+ this.searchResultFta =
+ new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), invIndex.getInvListTypeTraits());
}
protected void tokenizeQuery(InvertedIndexSearchPredicate searchPred) throws HyracksDataException {
@@ -100,7 +128,7 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc
int queryFieldIndex = searchPred.getQueryFieldIndex();
IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
// Is this a full-text query?
- // Then, the last argument is conjuctive or disjunctive search option, not a query text.
+ // Then, the last argument is conjunctive or disjunctive search option, not a query text.
// Thus, we need to remove the last argument.
boolean isFullTextSearchQuery = searchPred.getIsFullTextSearchQuery();
// Get the type of query tokenizer.
@@ -144,33 +172,13 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc
}
}
- @Override
- public IFrameTupleAccessor createResultFrameTupleAccessor() {
- return new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), searchResult.getTypeTraits());
- }
-
- @Override
- public ITupleReference createResultFrameTupleReference() {
- return new FixedSizeTupleReference(searchResult.getTypeTraits());
- }
-
- @Override
- public List<ByteBuffer> getResultBuffers() {
- return searchResult.getBuffers();
- }
-
- @Override
- public int getNumValidResultBuffers() {
- return searchResult.getCurrentBufferIndex() + 1;
- }
-
public int getOccurrenceThreshold() {
return occurrenceThreshold;
}
public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
StringBuffer strBuffer = new StringBuffer();
- FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
+ FixedSizeFrameTupleAccessor resultFrameTupleAcc = finalSearchResult.getAccessor();
for (int i = 0; i <= maxResultBufIdx; i++) {
ByteBuffer testBuf = buffer.get(i);
resultFrameTupleAcc.reset(testBuf);
@@ -183,4 +191,99 @@ public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearc
}
System.out.println(strBuffer.toString());
}
+
+ /**
+ * Checks whether underlying the inverted list cursor or final search result has a tuple to return.
+ */
+ @Override
+ public boolean hasNext() throws HyracksDataException {
+ do {
+ boolean moreToRead = hasMoreElement();
+ if (moreToRead) {
+ return true;
+ }
+ // Current cursor or buffer is exhausted. Unbinds the inverted list cursor or
+ // cleans the output buffer of the final search result.
+ resetResultSource();
+ // Search is done? Then, there's nothing left.
+ if (isFinishedSearch) {
+ return false;
+ }
+ // Otherwise, resume the search process.
+ continueSearch();
+ } while (true);
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ // Case 1: fetching a tuple from an inverted list cursor
+ if (isSingleInvertedList) {
+ singleInvListCursor.next();
+ } else {
+ // Case 2: fetching a tuple from the output frame of a final search result
+ searchResultTuple.reset(searchResultFta.getBuffer().array(),
+ searchResultFta.getTupleStartOffset(searchResultTupleIndex));
+ searchResultTupleIndex++;
+ }
+ }
+
+ private boolean hasMoreElement() throws HyracksDataException {
+ // Case #1: single inverted list cursor
+ if (isSingleInvertedList) {
+ return singleInvListCursor.hasNext();
+ }
+ // Case #2: ouput buffer from a final search result
+ return searchResultTupleIndex < searchResultFta.getTupleCount();
+ }
+
+ private void resetResultSource() throws HyracksDataException {
+ if (isSingleInvertedList) {
+ isSingleInvertedList = false;
+ singleInvListCursor.unloadPages();
+ singleInvListCursor.close();
+ singleInvListCursor = null;
+ } else {
+ finalSearchResult.resetBuffer();
+ searchResultTupleIndex = 0;
+ }
+ }
+
+ public void destroy() throws HyracksDataException {
+ // To ensure to release the buffer of the query token frame.
+ ((BufferManagerBackedVSizeFrame) queryTokenFrame).destroy();
+
+ // Releases the frames of the cursor.
+ if (isSingleInvertedList && singleInvListCursor != null) {
+ singleInvListCursor.unloadPages();
+ singleInvListCursor.close();
+ }
+ // Releases the frame of the final search result.
+ finalSearchResult.close();
+
+ // Releases the frames of the two intermediate search result.
+ invListMerger.close();
+ }
+
+ @Override
+ public ITupleReference getTuple() {
+ if (isSingleInvertedList) {
+ return singleInvListCursor.getTuple();
+ }
+ return searchResultTuple;
+ }
+
+ /**
+ * Prepares the search process. This mainly allocates/clears the buffer frames of the each component.
+ */
+ protected void prepareSearch() throws HyracksDataException {
+ finalSearchResult.prepareIOBuffer();
+ invListMerger.prepareMerge();
+ ((BufferManagerBackedVSizeFrame) queryTokenFrame).acquireFrame();
+ isFinishedSearch = false;
+ isSingleInvertedList = false;
+ searchResultFta.reset(finalSearchResult.getNextFrame());
+ searchResultTupleIndex = 0;
+ singleInvListCursor = null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java
new file mode 100644
index 0000000..55acb33
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexFinalSearchResult.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+
+/**
+ * This is an in-memory based storage for final results of inverted-index searches.
+ * Only one frame is used at a time. The same frame will be used multiple times.
+ */
+public class InvertedIndexFinalSearchResult extends InvertedIndexSearchResult {
+
+ public InvertedIndexFinalSearchResult(ITypeTraits[] invListFields, IHyracksTaskContext ctx,
+ ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+ super(invListFields, ctx, bufferManager);
+ }
+
+ /**
+ * The final search result only needs to keep the inverted list fields, not its count.
+ */
+ @Override
+ protected void initTypeTraits(ITypeTraits[] invListFields) {
+ typeTraits = new ITypeTraits[invListFields.length];
+ int tmp = 0;
+ for (int i = 0; i < invListFields.length; i++) {
+ typeTraits[i] = invListFields[i];
+ tmp += invListFields[i].getFixedLength();
+ }
+ invListElementSize = tmp;
+ }
+
+ /**
+ * Prepares the write operation. A result of the final search result will be always in memory.
+ */
+ @Override
+ public void prepareWrite(int numExpectedPages) throws HyracksDataException {
+ // Final search result: we will use the ioBuffer and we will not create any file.
+ // This method can be called multiple times in case of the partitioned T-Occurrence search.
+ // For those cases, if the write process has already begun, we should not clear the buffer.
+ isInMemoryOpMode = true;
+ isFileOpened = false;
+ resetAppenderLocation(IO_BUFFER_IDX);
+ isWriteFinished = false;
+ }
+
+ /**
+ * Appends an element to the frame of this result. When processing the final list,
+ * it does not create an additional frame when a frame becomes full to let the caller consume the frame.
+ *
+ * @return false if the current frame for the final result is full.
+ * true otherwise.
+ */
+ @Override
+ public boolean append(ITupleReference invListElement, int count) throws HyracksDataException {
+ // Pauses the addition of this tuple if the current page is full.
+ if (!appender.hasSpace()) {
+ return false;
+ }
+ // Appends the given inverted-list element.
+ if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT);
+ }
+ appender.incrementTupleCount(1);
+ numResults++;
+
+ return true;
+ }
+
+ /**
+ * Finalizes the write operation.
+ */
+ @Override
+ public void finalizeWrite() throws HyracksDataException {
+ if (isWriteFinished) {
+ return;
+ }
+ isWriteFinished = true;
+ }
+
+ /**
+ * Prepares a read operation.
+ */
+ @Override
+ public void prepareResultRead() throws HyracksDataException {
+ if (isInReadMode) {
+ return;
+ }
+ currentReaderBufIdx = 0;
+ isInReadMode = true;
+ }
+
+ /**
+ * Gets the next frame of the current result file.
+ */
+ @Override
+ public ByteBuffer getNextFrame() throws HyracksDataException {
+ return buffers.get(IO_BUFFER_IDX);
+ }
+
+ /**
+ * Finishes reading the result and frees the buffer.
+ */
+ @Override
+ public void closeResultRead(boolean deallocateIOBufferNeeded) throws HyracksDataException {
+ // Deallocates I/O buffer if requested.
+ if (deallocateIOBufferNeeded) {
+ deallocateIOBuffer();
+ }
+ }
+
+ /**
+ * Deallocates the buffer.
+ */
+ @Override
+ public void close() throws HyracksDataException {
+ deallocateIOBuffer();
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ // Resets the I/O buffer.
+ clearBuffer(ioBuffer);
+
+ searchResultWriter = null;
+ searchResultReader = null;
+ isInReadMode = false;
+ isWriteFinished = false;
+ isInMemoryOpMode = false;
+ isFileOpened = false;
+ currentWriterBufIdx = 0;
+ currentReaderBufIdx = 0;
+ numResults = 0;
+ }
+
+ /**
+ * Deallocates the I/O buffer (one frame). This should be the last operation.
+ */
+ @Override
+ protected void deallocateIOBuffer() throws HyracksDataException {
+ if (ioBufferFrame != null) {
+ bufferManager.releaseFrame(ioBuffer);
+ buffers.clear();
+ ioBufferFrame = null;
+ ioBuffer = null;
+ }
+ }
+
+ /**
+ * Resets the buffer.
+ */
+ public void resetBuffer() {
+ appender.reset(buffers.get(IO_BUFFER_IDX));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java
new file mode 100644
index 0000000..527d624
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchResult.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.ListIterator;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferManagerBackedVSizeFrame;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+/**
+ * Disk-based or in-memory based storage for intermediate and final results of inverted-index
+ * searches. One frame is dedicated to I/O operation for disk operation mode.
+ */
+public class InvertedIndexSearchResult {
+ // The size of count field for each element. Currently, we use an integer value.
+ protected static final int ELEMENT_COUNT_SIZE = 4;
+ // I/O buffer's index in the buffers
+ protected static final int IO_BUFFER_IDX = 0;
+ protected static final String FILE_PREFIX = "InvertedIndexSearchResult";
+ protected final IHyracksTaskContext ctx;
+ protected final FixedSizeFrameTupleAppender appender;
+ protected final FixedSizeFrameTupleAccessor accessor;
+ protected final FixedSizeTupleReference tuple;
+ protected final ISimpleFrameBufferManager bufferManager;
+ protected ITypeTraits[] typeTraits;
+ protected int invListElementSize;
+
+ protected int currentWriterBufIdx;
+ protected int currentReaderBufIdx;
+ protected int numResults;
+ protected int numPossibleElementPerPage;
+ // Read and Write I/O buffer
+ protected IFrame ioBufferFrame = null;
+ protected ByteBuffer ioBuffer = null;
+ // Buffers for in-memory operation mode. The first buffer is the ioBuffer.
+ // In case of the final search result, we will use only use the first buffer. No file will be created.
+ protected ArrayList<ByteBuffer> buffers;
+
+ protected RunFileWriter searchResultWriter;
+ protected RunFileReader searchResultReader;
+ protected boolean isInMemoryOpMode;
+ protected boolean isInReadMode;
+ protected boolean isWriteFinished;
+ protected boolean isFileOpened;
+
+ public InvertedIndexSearchResult(ITypeTraits[] invListFields, IHyracksTaskContext ctx,
+ ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+ initTypeTraits(invListFields);
+ this.ctx = ctx;
+ appender = new FixedSizeFrameTupleAppender(ctx.getInitialFrameSize(), typeTraits);
+ accessor = new FixedSizeFrameTupleAccessor(ctx.getInitialFrameSize(), typeTraits);
+ tuple = new FixedSizeTupleReference(typeTraits);
+ this.bufferManager = bufferManager;
+ this.isInReadMode = false;
+ this.isWriteFinished = false;
+ this.isInMemoryOpMode = false;
+ this.isFileOpened = false;
+ this.ioBufferFrame = null;
+ this.ioBuffer = null;
+ this.buffers = null;
+ this.currentWriterBufIdx = 0;
+ this.currentReaderBufIdx = 0;
+ this.numResults = 0;
+ calculateNumElementPerPage();
+ // Allocates one frame for read/write operation.
+ prepareIOBuffer();
+ }
+
+ /**
+ * Initializes the element type in the search result. In addition to the element, we will keep one more integer
+ * per element to keep its occurrence count.
+ */
+ protected void initTypeTraits(ITypeTraits[] invListFields) {
+ typeTraits = new ITypeTraits[invListFields.length + 1];
+ int tmp = 0;
+ for (int i = 0; i < invListFields.length; i++) {
+ typeTraits[i] = invListFields[i];
+ tmp += invListFields[i].getFixedLength();
+ }
+ invListElementSize = tmp;
+ // Integer for counting occurrences.
+ typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+ }
+
+ /**
+ * Prepares the write operation. Tries to allocate buffers for the expected number of pages.
+ * If that is possible, all operations will be executed in memory.
+ * If not, all operations will use a file on disk except for the final search result.
+ * A result of the final search result will be always in memory.
+ */
+ public void prepareWrite(int numExpectedPages) throws HyracksDataException {
+ if (isInReadMode || isWriteFinished || searchResultWriter != null) {
+ return;
+ }
+ // Intermediate results? disk or in-memory based
+ // Allocates more buffers.
+ isInMemoryOpMode = tryAllocateBuffers(numExpectedPages);
+ if (!isInMemoryOpMode) {
+ // Not enough number of buffers. Switch to the file I/O mode.
+ createAndOpenFile();
+ }
+ appender.reset(ioBuffer);
+ isWriteFinished = false;
+ }
+
+ /**
+ * Appends an element and its count to the current frame of this result. The boolean value is necessary for
+ * the final search result case since the append() of that class is overriding this method.
+ */
+ public boolean append(ITupleReference invListElement, int count) throws HyracksDataException {
+ ByteBuffer currentBuffer;
+ // Moves to the next page if the current page is full.
+ if (!appender.hasSpace()) {
+ currentWriterBufIdx++;
+ if (isInMemoryOpMode) {
+ currentBuffer = buffers.get(currentWriterBufIdx);
+ } else {
+ searchResultWriter.nextFrame(ioBuffer);
+ currentBuffer = ioBuffer;
+ }
+ appender.reset(currentBuffer);
+ }
+ // Appends inverted-list element.
+ if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT);
+ }
+ // Appends count.
+ if (!appender.append(count)) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT);
+ }
+ appender.incrementTupleCount(1);
+ numResults++;
+
+ // Always true for the intermediate result. An append should not fail.
+ return true;
+ }
+
+ /**
+ * Finalizes the write operation. After this, no more write operation can be conducted.
+ */
+ public void finalizeWrite() throws HyracksDataException {
+ if (isWriteFinished) {
+ return;
+ }
+ // For in-memory operation (including the final result), no specific operations are required.
+ // For disk-based operation, needs to close the writer.
+ if (!isInMemoryOpMode && searchResultWriter != null) {
+ searchResultWriter.nextFrame(ioBuffer);
+ searchResultWriter.close();
+ }
+ isWriteFinished = true;
+ }
+
+ /**
+ * Prepares a read operation.
+ */
+ public void prepareResultRead() throws HyracksDataException {
+ if (isInReadMode) {
+ return;
+ }
+ // No specific operation is required for in-memory mode (including the final result).
+ if (!isInMemoryOpMode && searchResultWriter != null) {
+ if (!isWriteFinished) {
+ finalizeWrite();
+ }
+ searchResultReader = searchResultWriter.createDeleteOnCloseReader();
+ searchResultReader.open();
+ searchResultReader.setDeleteAfterClose(true);
+ }
+ currentReaderBufIdx = 0;
+ isInReadMode = true;
+ }
+
+ /**
+ * Gets the next frame of the current result file. A caller should make sure that initResultRead() is called first.
+ */
+ public ByteBuffer getNextFrame() throws HyracksDataException {
+ ByteBuffer returnedBuffer = null;
+ if (isInMemoryOpMode) {
+ // In-memory mode for an intermediate search result
+ returnedBuffer = buffers.get(currentReaderBufIdx);
+ currentReaderBufIdx++;
+ } else if (searchResultReader != null && searchResultReader.nextFrame(ioBufferFrame)) {
+ // Disk-based mode for an intermediate search result
+ returnedBuffer = ioBufferFrame.getBuffer();
+ }
+ return returnedBuffer;
+ }
+
+ /**
+ * Finishes reading the result and frees the buffer.
+ */
+ public void closeResultRead(boolean deallocateIOBufferNeeded) throws HyracksDataException {
+ if (isInMemoryOpMode) {
+ // In-memory mode? Releases all buffers for an intermediate search result.
+ deallocateBuffers();
+ } else if (searchResultReader != null) {
+ // Disk mode? Closes the file handle (this should delete the file also.)
+ searchResultReader.close();
+ }
+
+ // Deallocates I/O buffer if requested.
+ if (deallocateIOBufferNeeded) {
+ deallocateIOBuffer();
+ }
+ }
+
+ public int getCurrentBufferIndex() {
+ return currentWriterBufIdx;
+ }
+
+ public ITypeTraits[] getTypeTraits() {
+ return typeTraits;
+ }
+
+ public int getNumResults() {
+ return numResults;
+ }
+
+ /**
+ * Deletes any associated file and deallocates all buffers.
+ */
+ public void close() throws HyracksDataException {
+ if (isInMemoryOpMode) {
+ deallocateBuffers();
+ } else {
+ if (searchResultReader != null) {
+ searchResultReader.close();
+ } else if (searchResultWriter != null) {
+ searchResultWriter.erase();
+ }
+ }
+ deallocateIOBuffer();
+ }
+
+ public void reset() throws HyracksDataException {
+ // Removes the file if it was in the disk op mode.
+ if (searchResultReader != null) {
+ searchResultReader.close();
+ } else if (searchResultWriter != null) {
+ searchResultWriter.erase();
+ } else if (buffers.size() > 1) {
+ // In-memory mode? Deallocates all buffers.
+ deallocateBuffers();
+ }
+
+ // Resets the I/O buffer.
+ clearBuffer(ioBuffer);
+
+ searchResultWriter = null;
+ searchResultReader = null;
+ isInReadMode = false;
+ isWriteFinished = false;
+ isInMemoryOpMode = false;
+ isFileOpened = false;
+ currentWriterBufIdx = 0;
+ currentReaderBufIdx = 0;
+ numResults = 0;
+ }
+
+ /**
+ * Gets the expected number of pages if all elements are created as a result.
+ * An assumption is that there are no common elements between the previous result and the cursor.
+ */
+ public int getExpectedNumPages(int numExpectedElements) {
+ return (int) Math.ceil((double) numExpectedElements / numPossibleElementPerPage);
+ }
+
+ // Gets the number of possible elements per page based on the inverted list element size.
+ protected void calculateNumElementPerPage() {
+ int frameSize = ctx.getInitialFrameSize();
+ // The count of Minframe, and the count of tuples in a frame should be deducted.
+ frameSize = frameSize - FixedSizeFrameTupleAppender.MINFRAME_COUNT_SIZE
+ - FixedSizeFrameTupleAppender.TUPLE_COUNT_SIZE;
+ numPossibleElementPerPage = (int) Math.floor((double) frameSize / (invListElementSize + ELEMENT_COUNT_SIZE));
+ }
+
+ /**
+ * Allocates the buffer for read/write operation and initializes the buffers array that will be used keep a result.
+ */
+ protected void prepareIOBuffer() throws HyracksDataException {
+ if (ioBufferFrame != null) {
+ clearBuffer(ioBuffer);
+ } else {
+ ioBufferFrame = new BufferManagerBackedVSizeFrame(ctx, bufferManager);
+ ioBuffer = ioBufferFrame.getBuffer();
+ if (ioBuffer == null) {
+ // One frame should be allocated for conducting read/write
+ // operation. Otherwise, can't store the result.
+ throw HyracksDataException.create(ErrorCode.NOT_ENOUGH_BUDGET_FOR_TEXTSEARCH,
+ this.getClass().getSimpleName());
+ }
+ clearBuffer(ioBuffer);
+ // For keeping the results in memory if possible.
+ buffers = new ArrayList<ByteBuffer>();
+ buffers.add(ioBuffer);
+ }
+ }
+
+ /**
+ * Tries to allocate buffers to accommodate the results in memory.
+ */
+ protected boolean tryAllocateBuffers(int numExpectedPages) throws HyracksDataException {
+ boolean allBufferAllocated = true;
+ while (buffers.size() < numExpectedPages) {
+ ByteBuffer tmpBuffer = bufferManager.acquireFrame(ctx.getInitialFrameSize());
+ if (tmpBuffer == null) {
+ // Budget exhausted
+ allBufferAllocated = false;
+ break;
+ } else {
+ clearBuffer(tmpBuffer);
+ }
+ buffers.add(tmpBuffer);
+ }
+ return allBufferAllocated;
+ }
+
+ // Creates a file for the writer.
+ protected void createAndOpenFile() throws HyracksDataException {
+ if (isInMemoryOpMode) {
+ // In-memory mode should not generate a file.
+ return;
+ }
+ if (searchResultWriter == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(FILE_PREFIX);
+ searchResultWriter = new RunFileWriter(file, ctx.getIoManager());
+ searchResultWriter.open();
+ isFileOpened = true;
+ }
+ }
+
+ // Deallocates the I/O buffer (one frame). This should be the last oepration.
+ protected void deallocateIOBuffer() throws HyracksDataException {
+ if (ioBufferFrame != null) {
+ bufferManager.releaseFrame(ioBuffer);
+ buffers.clear();
+ ioBufferFrame = null;
+ ioBuffer = null;
+ }
+ }
+
+ /**
+ * Deallocates the buffers. We do not remove the first buffer since it can be used as an I/O buffer.
+ */
+ protected void deallocateBuffers() throws HyracksDataException {
+ int toDeleteCount = buffers.size() - 1;
+ int deletedCount = 0;
+ for (ListIterator<ByteBuffer> iter = buffers.listIterator(buffers.size()); iter.hasPrevious();) {
+ if (deletedCount >= toDeleteCount) {
+ break;
+ }
+ ByteBuffer next = iter.previous();
+ bufferManager.releaseFrame(next);
+ iter.remove();
+ deletedCount++;
+ }
+ }
+
+ public FixedSizeFrameTupleAccessor getAccessor() {
+ return accessor;
+ }
+
+ public FixedSizeFrameTupleAppender getAppender() {
+ return appender;
+ }
+
+ public FixedSizeTupleReference getTuple() {
+ return tuple;
+ }
+
+ protected void clearBuffer(ByteBuffer bufferToClear) {
+ Arrays.fill(bufferToClear.array(), (byte) 0);
+ bufferToClear.clear();
+ }
+
+ protected void resetAppenderLocation(int bufferIdx) {
+ accessor.reset(buffers.get(bufferIdx));
+ appender.reset(buffers.get(bufferIdx), false, accessor.getTupleCount(),
+ accessor.getTupleEndOffset(accessor.getTupleCount() - 1));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afe0d3d9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
index 8ed80f6..d0337cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
@@ -19,20 +19,24 @@
package org.apache.hyracks.storage.am.lsm.invertedindex.search;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.InvertedListCursor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
-public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> {
+public class InvertedListCursorFactory implements IObjectFactory<InvertedListCursor> {
private final IInPlaceInvertedIndex invIndex;
+ private final IHyracksTaskContext ctx;
- public InvertedListCursorFactory(IInPlaceInvertedIndex invIndex) {
+ public InvertedListCursorFactory(IInPlaceInvertedIndex invIndex, IHyracksTaskContext ctx) {
this.invIndex = invIndex;
+ this.ctx = ctx;
}
@Override
- public IInvertedListCursor create() {
- return invIndex.createInvertedListCursor();
+ public InvertedListCursor create() throws HyracksDataException {
+ return invIndex.createInvertedListCursor(ctx);
}
}