You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2020/08/31 18:55:21 UTC

[asterixdb] branch master updated: [NO ISSUE] Refactored Merge Join to use Cursor

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8778756  [NO ISSUE] Refactored Merge Join to use Cursor
8778756 is described below

commit 8778756df1d151d0d64112c0fce6fc1eabf1c789
Author: caherbel <ca...@gmail.com>
AuthorDate: Sun Aug 30 18:28:34 2020 -0600

    [NO ISSUE] Refactored Merge Join to use Cursor
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    -Adds A Cursor for ITuplePointerAccessor, and IFrameTupleAccessor
     using an interface
    -Minor refactoring
    -Removes ITuple Accessor
    
    Change-Id: I3134e77d521cf192e9948912720414f868cdc83b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7703
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
---
 .../joins/interval/IntervalMergeJoiner.java        | 171 ++++++++----------
 .../utils/memory/AbstractTupleAccessor.java        | 194 ---------------------
 ...TupleAccessor.java => AbstractTupleCursor.java} |  53 ++----
 .../{ITupleAccessor.java => FrameTupleCursor.java} |  57 ++----
 .../joins/interval/utils/memory/ITupleCursor.java  |  85 +++++++++
 .../interval/utils/memory/IntervalJoinUtil.java    |  24 ---
 .../interval/utils/memory/IntervalSideTuple.java   |  42 ++---
 ...ntervalVariableDeletableTupleMemoryManager.java | 112 ------------
 .../joins/interval/utils/memory/RunFileStream.java |  29 +--
 .../joins/interval/utils/memory/TupleAccessor.java | 118 -------------
 ...ITupleAccessor.java => TuplePointerCursor.java} |  66 +++----
 11 files changed, 244 insertions(+), 707 deletions(-)

diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
index d1d2310..c19ad06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
@@ -19,16 +19,14 @@
 package org.apache.asterix.runtime.operators.joins.interval;
 
 import java.nio.ByteBuffer;
-import java.util.Iterator;
 import java.util.LinkedList;
 
 import org.apache.asterix.runtime.operators.joins.interval.utils.IIntervalJoinUtil;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.ITupleAccessor;
+import org.apache.asterix.runtime.operators.joins.interval.utils.memory.FrameTupleCursor;
 import org.apache.asterix.runtime.operators.joins.interval.utils.memory.IntervalSideTuple;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.IntervalVariableDeletableTupleMemoryManager;
 import org.apache.asterix.runtime.operators.joins.interval.utils.memory.RunFilePointer;
 import org.apache.asterix.runtime.operators.joins.interval.utils.memory.RunFileStream;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.TupleAccessor;
+import org.apache.asterix.runtime.operators.joins.interval.utils.memory.TuplePointerCursor;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -41,6 +39,7 @@ import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 /**
@@ -52,22 +51,9 @@ import org.apache.hyracks.dataflow.std.structures.TuplePointer;
  */
 public class IntervalMergeJoiner {
 
-    public enum TupleStatus {
-        LOADED,
-        EMPTY;
-
-        public boolean isLoaded() {
-            return this.equals(LOADED);
-        }
-
-        public boolean isEmpty() {
-            return this.equals(EMPTY);
-        }
-    }
-
     private final IDeallocatableFramePool framePool;
     private final IDeletableTupleBufferManager bufferManager;
-    private final ITupleAccessor memoryAccessor;
+    private final TuplePointerCursor memoryCursor;
     private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
 
     private final RunFileStream runFileStream;
@@ -84,7 +70,7 @@ public class IntervalMergeJoiner {
 
     protected final IFrame[] inputBuffer;
     protected final FrameTupleAppender resultAppender;
-    protected final ITupleAccessor[] inputAccessor;
+    protected final FrameTupleCursor[] inputCursor;
 
     public IntervalMergeJoiner(IHyracksTaskContext ctx, int memorySize, IIntervalJoinUtil mjc, int buildKeys,
             int probeKeys, RecordDescriptor buildRd, RecordDescriptor probeRd) throws HyracksDataException {
@@ -92,13 +78,13 @@ public class IntervalMergeJoiner {
 
         // Memory (probe buffer)
         if (memorySize < 5) {
-            throw new HyracksDataException(
-                    "MergeJoiner does not have enough memory (needs > 4, got " + memorySize + ").");
+            throw new RuntimeException(
+                    "IntervalMergeJoiner does not have enough memory (needs > 4, got " + memorySize + ").");
         }
 
-        inputAccessor = new TupleAccessor[JOIN_PARTITIONS];
-        inputAccessor[BUILD_PARTITION] = new TupleAccessor(buildRd);
-        inputAccessor[PROBE_PARTITION] = new TupleAccessor(probeRd);
+        inputCursor = new FrameTupleCursor[JOIN_PARTITIONS];
+        inputCursor[BUILD_PARTITION] = new FrameTupleCursor(buildRd);
+        inputCursor[PROBE_PARTITION] = new FrameTupleCursor(probeRd);
 
         inputBuffer = new IFrame[JOIN_PARTITIONS];
         inputBuffer[BUILD_PARTITION] = new VSizeFrame(ctx);
@@ -106,164 +92,143 @@ public class IntervalMergeJoiner {
 
         //Two frames are used for the runfile stream, and one frame for each input (2 outputs).
         framePool = new DeallocatableFramePool(ctx, (memorySize - 4) * ctx.getInitialFrameSize());
-        bufferManager = new IntervalVariableDeletableTupleMemoryManager(framePool, probeRd);
-        memoryAccessor = ((IntervalVariableDeletableTupleMemoryManager) bufferManager).createTupleAccessor();
+        bufferManager = new VariableDeletableTupleMemoryManager(framePool, probeRd);
+        memoryCursor = new TuplePointerCursor(bufferManager.createTuplePointerAccessor());
 
         // Run File and frame cache (build buffer)
-        runFileStream = new RunFileStream(ctx, "ismj-left");
+        runFileStream = new RunFileStream(ctx, "imj-build");
         runFilePointer = new RunFilePointer();
         runFileStream.createRunFileWriting();
         runFileStream.startRunFileWriting();
 
-        memoryTuple = new IntervalSideTuple(mjc, memoryAccessor, probeKeys);
+        memoryTuple = new IntervalSideTuple(mjc, memoryCursor, probeKeys);
 
         inputTuple = new IntervalSideTuple[JOIN_PARTITIONS];
-        inputTuple[PROBE_PARTITION] = new IntervalSideTuple(mjc, inputAccessor[PROBE_PARTITION], probeKeys);
-        inputTuple[BUILD_PARTITION] = new IntervalSideTuple(mjc, inputAccessor[BUILD_PARTITION], buildKeys);
+        inputTuple[PROBE_PARTITION] = new IntervalSideTuple(mjc, inputCursor[PROBE_PARTITION], probeKeys);
+        inputTuple[BUILD_PARTITION] = new IntervalSideTuple(mjc, inputCursor[BUILD_PARTITION], buildKeys);
 
         // Result
         this.resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
     }
 
     public void processBuildFrame(ByteBuffer buffer) throws HyracksDataException {
-        inputAccessor[BUILD_PARTITION].reset(buffer);
-        for (int x = 0; x < inputAccessor[BUILD_PARTITION].getTupleCount(); x++) {
-            runFileStream.addToRunFile(inputAccessor[BUILD_PARTITION], x);
+        inputCursor[BUILD_PARTITION].reset(buffer);
+        for (int x = 0; x < inputCursor[BUILD_PARTITION].getAccessor().getTupleCount(); x++) {
+            runFileStream.addToRunFile(inputCursor[BUILD_PARTITION].getAccessor(), x);
         }
     }
 
     public void processBuildClose() throws HyracksDataException {
         runFileStream.flushRunFile();
-        runFileStream.startReadingRunFile(inputAccessor[BUILD_PARTITION]);
+        runFileStream.startReadingRunFile(inputCursor[BUILD_PARTITION]);
     }
 
     public void processProbeFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        inputAccessor[PROBE_PARTITION].reset(buffer);
-        inputAccessor[PROBE_PARTITION].next();
-
-        TupleStatus buildTs = loadBuildTuple();
-        TupleStatus probeTs = loadProbeTuple();
-        while (buildTs.isLoaded() && probeTs.isLoaded()) {
-            if (probeTs.isLoaded() && mjc.checkToLoadNextProbeTuple(inputAccessor[BUILD_PARTITION],
-                    inputAccessor[BUILD_PARTITION].getTupleId(), inputAccessor[PROBE_PARTITION],
-                    inputAccessor[PROBE_PARTITION].getTupleId())) {
-                // Right side from stream
+        inputCursor[PROBE_PARTITION].reset(buffer);
+        while (buildHasNext() && inputCursor[PROBE_PARTITION].hasNext()) {
+            if (inputCursor[PROBE_PARTITION].hasNext() && mjc.checkToLoadNextProbeTuple(
+                    inputCursor[BUILD_PARTITION].getAccessor(), inputCursor[BUILD_PARTITION].getTupleId() + 1,
+                    inputCursor[PROBE_PARTITION].getAccessor(), inputCursor[PROBE_PARTITION].getTupleId() + 1)) {
+                // Process probe side from stream
+                inputCursor[PROBE_PARTITION].next();
                 processProbeTuple(writer);
-                probeTs = loadProbeTuple();
             } else {
-                // Left side from stream
+                // Process build side from runfile
+                inputCursor[BUILD_PARTITION].next();
                 processBuildTuple(writer);
-                buildTs = loadBuildTuple();
             }
         }
     }
 
     public void processProbeClose(IFrameWriter writer) throws HyracksDataException {
-
-        TupleStatus buildTs = loadBuildTuple();
-        while (buildTs.isLoaded() && memoryHasTuples()) {
-            // Left side from stream
+        while (buildHasNext() && memoryHasTuples()) {
+            // Process build side from runfile
+            inputCursor[BUILD_PARTITION].next();
             processBuildTuple(writer);
-            buildTs = loadBuildTuple();
         }
-
         resultAppender.write(writer, true);
         runFileStream.close();
         runFileStream.removeRunFile();
     }
 
-    private TupleStatus loadProbeTuple() {
-        TupleStatus loaded;
-        if (inputAccessor[PROBE_PARTITION] != null && inputAccessor[PROBE_PARTITION].exists()) {
-            // Still processing frame.
-            loaded = TupleStatus.LOADED;
+    private boolean buildHasNext() throws HyracksDataException {
+        if (!inputCursor[BUILD_PARTITION].hasNext()) {
+            // Must keep condition in a separate `if` due to actions applied in loadNextBuffer.
+            return runFileStream.loadNextBuffer(inputCursor[BUILD_PARTITION]);
         } else {
-            // No more frames or tuples to process.
-            loaded = TupleStatus.EMPTY;
-        }
-        return loaded;
-    }
-
-    private TupleStatus loadBuildTuple() throws HyracksDataException {
-        if (!inputAccessor[BUILD_PARTITION].exists()) {
-            // Must keep condition in a separate if due to actions applied in loadNextBuffer.
-            if (!runFileStream.loadNextBuffer(inputAccessor[BUILD_PARTITION])) {
-                return TupleStatus.EMPTY;
-            }
+            return true;
         }
-        return TupleStatus.LOADED;
     }
 
     private void processBuildTuple(IFrameWriter writer) throws HyracksDataException {
         // Check against memory
         if (memoryHasTuples()) {
             inputTuple[BUILD_PARTITION].loadTuple();
-            Iterator<TuplePointer> memoryIterator = memoryBuffer.iterator();
-            while (memoryIterator.hasNext()) {
-                TuplePointer tp = memoryIterator.next();
-                memoryTuple.setTuple(tp);
+            memoryCursor.reset(memoryBuffer.iterator());
+            while (memoryCursor.hasNext()) {
+                memoryCursor.next();
+                memoryTuple.loadTuple();
                 if (inputTuple[BUILD_PARTITION].removeFromMemory(memoryTuple)) {
                     // remove from memory
-                    bufferManager.deleteTuple(tp);
-                    memoryIterator.remove();
+                    bufferManager.deleteTuple(memoryCursor.getTuplePointer());
+                    memoryCursor.remove();
                     continue;
                 } else if (inputTuple[BUILD_PARTITION].checkForEarlyExit(memoryTuple)) {
                     // No more possible comparisons
                     break;
                 } else if (inputTuple[BUILD_PARTITION].compareJoin(memoryTuple)) {
                     // add to result
-                    addToResult(inputAccessor[BUILD_PARTITION], inputAccessor[BUILD_PARTITION].getTupleId(),
-                            memoryAccessor, tp.getTupleIndex(), writer);
+                    addToResult(inputCursor[BUILD_PARTITION].getAccessor(), inputCursor[BUILD_PARTITION].getTupleId(),
+                            memoryCursor.getAccessor(), memoryCursor.getTupleId(), writer);
                 }
             }
         }
-        inputAccessor[BUILD_PARTITION].next();
     }
 
     private void processProbeTuple(IFrameWriter writer) throws HyracksDataException {
         // append to memory
-        if (mjc.checkToSaveInMemory(inputAccessor[BUILD_PARTITION], inputAccessor[BUILD_PARTITION].getTupleId(),
-                inputAccessor[PROBE_PARTITION], inputAccessor[PROBE_PARTITION].getTupleId())) {
-            if (!addToMemory(inputAccessor[PROBE_PARTITION])) {
-                unfreezeAndClearMemory(writer, inputAccessor[BUILD_PARTITION]);
-                return;
+        // BUILD Cursor is guaranteed to have next
+        if (mjc.checkToSaveInMemory(inputCursor[BUILD_PARTITION].getAccessor(),
+                inputCursor[BUILD_PARTITION].getTupleId() + 1, inputCursor[PROBE_PARTITION].getAccessor(),
+                inputCursor[PROBE_PARTITION].getTupleId())) {
+            if (!addToMemory(inputCursor[PROBE_PARTITION].getAccessor(), inputCursor[PROBE_PARTITION].getTupleId())) {
+                unfreezeAndClearMemory(writer);
+                if (!addToMemory(inputCursor[PROBE_PARTITION].getAccessor(),
+                        inputCursor[PROBE_PARTITION].getTupleId())) {
+                    throw new RuntimeException("Should Never get called.");
+                }
             }
         }
-        inputAccessor[PROBE_PARTITION].next();
     }
 
-    private void unfreezeAndClearMemory(IFrameWriter writer, ITupleAccessor accessor) throws HyracksDataException {
-        runFilePointer.reset(runFileStream.getReadPointer(), inputAccessor[BUILD_PARTITION].getTupleId());
-        TupleStatus buildTs = loadBuildTuple();
-        while (buildTs.isLoaded() && memoryHasTuples()) {
-            // Left side from stream
+    private void unfreezeAndClearMemory(IFrameWriter writer) throws HyracksDataException {
+        runFilePointer.reset(runFileStream.getReadPointer(), inputCursor[BUILD_PARTITION].getTupleId());
+        while (buildHasNext() && memoryHasTuples()) {
+            // Process build side from runfile
+            inputCursor[BUILD_PARTITION].next();
             processBuildTuple(writer);
-            buildTs = loadBuildTuple();
         }
-        // Finish writing
-        runFileStream.flushRunFile();
         // Clear memory
         memoryBuffer.clear();
         bufferManager.reset();
         // Start reading
-        runFileStream.startReadingRunFile(accessor, runFilePointer.getFileOffset());
-        accessor.setTupleId(runFilePointer.getTupleIndex());
-        runFilePointer.reset(-1, -1);
+        runFileStream.startReadingRunFile(inputCursor[BUILD_PARTITION], runFilePointer.getFileOffset());
+        inputCursor[BUILD_PARTITION].resetPosition(runFilePointer.getTupleIndex());
     }
 
-    private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException {
+    private boolean addToMemory(IFrameTupleAccessor accessor, int tupleId) throws HyracksDataException {
         TuplePointer tp = new TuplePointer();
-        if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
+        if (bufferManager.insertTuple(accessor, tupleId, tp)) {
             memoryBuffer.add(tp);
             return true;
         }
         return false;
     }
 
-    private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight,
-            int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
-                rightTupleIndex);
+    private void addToResult(IFrameTupleAccessor buildAccessor, int buildTupleId, IFrameTupleAccessor probeAccessor,
+            int probeTupleId, IFrameWriter writer) throws HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, resultAppender, buildAccessor, buildTupleId, probeAccessor,
+                probeTupleId);
     }
 
     private boolean memoryHasTuples() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java
deleted file mode 100644
index 397f959..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public abstract class AbstractTupleAccessor implements ITupleAccessor {
-    public static final int UNSET = -2;
-    public static final int INITIALIZED = -1;
-
-    protected int tupleId = UNSET;
-
-    protected int frameId;
-
-    protected abstract IFrameTupleAccessor getInnerAccessor();
-
-    protected abstract void resetInnerAccessor(int frameId);
-
-    protected abstract void resetInnerAccessor(TuplePointer tp);
-
-    protected abstract int getFrameCount();
-
-    @Override
-    public int getTupleStartOffset() {
-        return getTupleStartOffset(tupleId);
-    }
-
-    @Override
-    public int getTupleLength() {
-        return getTupleLength(tupleId);
-    }
-
-    @Override
-    public int getAbsFieldStartOffset(int fieldId) {
-        return getAbsoluteFieldStartOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public int getFieldLength(int fieldId) {
-        return getFieldLength(tupleId, fieldId);
-    }
-
-    @Override
-    public ByteBuffer getBuffer() {
-        return getInnerAccessor().getBuffer();
-    }
-
-    @Override
-    public int getFieldCount() {
-        return getInnerAccessor().getFieldCount();
-    }
-
-    @Override
-    public int getFieldSlotsLength() {
-        return getInnerAccessor().getFieldSlotsLength();
-    }
-
-    @Override
-    public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return getInnerAccessor().getFieldEndOffset(tupleId, fIdx);
-    }
-
-    @Override
-    public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return getInnerAccessor().getFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldLength(int tupleIndex, int fIdx) {
-        return getInnerAccessor().getFieldLength(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleLength(int tupleIndex) {
-        return getInnerAccessor().getTupleLength(tupleIndex);
-    }
-
-    @Override
-    public int getTupleEndOffset(int tupleIndex) {
-        return getInnerAccessor().getTupleEndOffset(tupleIndex);
-    }
-
-    @Override
-    public int getTupleStartOffset(int tupleIndex) {
-        return getInnerAccessor().getTupleStartOffset(tupleIndex);
-    }
-
-    @Override
-    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
-        return getInnerAccessor().getAbsoluteFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleCount() {
-        return getInnerAccessor().getTupleCount();
-    }
-
-    @Override
-    public void reset(TuplePointer tuplePointer) {
-        resetInnerAccessor(tuplePointer.getFrameIndex());
-    }
-
-    @Override
-    public void reset(ByteBuffer buffer) {
-        throw new IllegalAccessError("Should never call this reset");
-    }
-
-    @Override
-    public int getTupleEndOffset() {
-        return getInnerAccessor().getTupleEndOffset(tupleId);
-    }
-
-    @Override
-    public int getFieldEndOffset(int fieldId) {
-        return getInnerAccessor().getFieldEndOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public int getFieldStartOffset(int fieldId) {
-        return getInnerAccessor().getFieldStartOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public void getTuplePointer(TuplePointer tp) {
-        tp.reset(frameId, tupleId);
-    }
-
-    @Override
-    public int getTupleId() {
-        return tupleId;
-    }
-
-    @Override
-    public void setTupleId(int tupleId) {
-        this.tupleId = tupleId;
-    }
-
-    @Override
-    public void reset() {
-        tupleId = INITIALIZED;
-        frameId = 0;
-        resetInnerAccessor(frameId);
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (tupleId + 1 < getTupleCount() || frameId + 1 < getFrameCount()) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean exists() {
-        return INITIALIZED < tupleId && getTupleEndOffset(tupleId) > 0 && tupleId < getTupleCount()
-                && frameId < getFrameCount();
-    }
-
-    @Override
-    public void next() {
-        // TODO Consider error messages
-        if (tupleId + 1 < getTupleCount()) {
-            ++tupleId;
-        } else if (frameId + 1 < getFrameCount()) {
-            ++frameId;
-            resetInnerAccessor(frameId);
-            tupleId = 0;
-        } else {
-            // Force exists to fail, by incrementing the tuple pointer.
-            ++tupleId;
-        }
-    }
-
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java
similarity index 55%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java
index 77059d6..5734dec 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java
@@ -16,54 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleAccessor extends IFrameTupleAccessor {
-    int getTupleStartOffset();
-
-    int getTupleEndOffset();
 
-    int getTupleLength();
+public abstract class AbstractTupleCursor<T> implements ITupleCursor<T> {
 
-    int getAbsFieldStartOffset(int fieldId);
-
-    int getFieldLength(int fieldId);
+    public static final int UNSET = -2;
+    public static final int INITIALIZED = -1;
+    public int tupleId = UNSET;
+    protected IFrameTupleAccessor accessor;
 
     @Override
-    int getFieldCount();
+    public void next() {
+        ++tupleId;
+    }
 
     @Override
-    int getFieldSlotsLength();
-
-    int getFieldEndOffset(int fieldId);
-
-    int getFieldStartOffset(int fieldId);
-
-    void reset(TuplePointer tuplePointer);
+    public IFrameTupleAccessor getAccessor() {
+        return accessor;
+    }
 
     @Override
-    void reset(ByteBuffer buffer);
-
-    int getTupleId();
-
-    void setTupleId(int tupleId);
-
-    void getTuplePointer(TuplePointer tp);
-
-    /**
-     * Only reset the iterator.
-     */
-    void reset();
-
-    boolean hasNext();
-
-    void next();
-
-    boolean exists();
+    public int getTupleId() {
+        return tupleId;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java
similarity index 53%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java
index 77059d6..b6c9b2e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java
@@ -16,54 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleAccessor extends IFrameTupleAccessor {
-    int getTupleStartOffset();
-
-    int getTupleEndOffset();
-
-    int getTupleLength();
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
-    int getAbsFieldStartOffset(int fieldId);
+public class FrameTupleCursor extends AbstractTupleCursor<ByteBuffer> {
 
-    int getFieldLength(int fieldId);
+    public FrameTupleCursor(RecordDescriptor recordDescriptor) {
+        accessor = new FrameTupleAccessor(recordDescriptor);
+    }
 
     @Override
-    int getFieldCount();
-
-    @Override
-    int getFieldSlotsLength();
-
-    int getFieldEndOffset(int fieldId);
-
-    int getFieldStartOffset(int fieldId);
-
-    void reset(TuplePointer tuplePointer);
+    public boolean hasNext() {
+        return INITIALIZED < (tupleId + 1) && (tupleId + 1) < accessor.getTupleCount();
+    }
 
     @Override
-    void reset(ByteBuffer buffer);
-
-    int getTupleId();
-
-    void setTupleId(int tupleId);
-
-    void getTuplePointer(TuplePointer tp);
-
-    /**
-     * Only reset the iterator.
-     */
-    void reset();
-
-    boolean hasNext();
-
-    void next();
-
-    boolean exists();
+    public void reset(ByteBuffer byteBuffer) {
+        accessor.reset(byteBuffer);
+        tupleId = INITIALIZED;
+    }
+
+    public void resetPosition(int tupleId) {
+        this.tupleId = tupleId;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java
new file mode 100644
index 0000000..8572ceb
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+
+/**
+ * Represents an index cursor. The expected use
+ * cursor = new cursor();
+ * while(predicate){
+ * -cursor.reset()
+ * -while (cursor.hasNext()){
+ * --cursor.next()
+ * -}
+ * }
+ */
+public interface ITupleCursor<T> {
+
+    /**
+     * Checks if the Current Tuple Index Exists
+     *
+     * @return
+     */
+    boolean hasNext();
+
+    /**
+     * Increments the Tuple Index
+     *
+     */
+    void next();
+
+    /**
+     * Used in FrameTupleCursor to reset the accessor to the buffer
+     *
+     * @param param
+     */
+    void reset(T param);
+
+    /**
+     * Return the accessor
+     *
+     * @return
+     */
+    IFrameTupleAccessor getAccessor();
+
+    /**
+     * Return the tuple id.
+     *
+     * @return
+     */
+    int getTupleId();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
index 7ff5816..8a065f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
@@ -21,22 +21,12 @@ package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
 import org.apache.asterix.om.pointables.nonvisitor.AIntervalPointable;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
 
 public class IntervalJoinUtil {
 
     private IntervalJoinUtil() {
     }
 
-    public static void getIntervalPointable(ITupleAccessor accessor, int fieldId, TaggedValuePointable tvp,
-            AIntervalPointable ip) {
-        int start =
-                accessor.getTupleStartOffset() + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(fieldId);
-        int length = accessor.getFieldLength(fieldId);
-        tvp.set(accessor.getBuffer().array(), start, length);
-        tvp.getValue(ip);
-    }
-
     public static void getIntervalPointable(IFrameTupleAccessor accessor, int tupleId, int fieldId,
             AIntervalPointable ip) {
         int start = getIntervalOffset(accessor, tupleId, fieldId);
@@ -65,18 +55,4 @@ public class IntervalJoinUtil {
         long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), start);
         return intervalEnd;
     }
-
-    public static long getIntervalStart(ITupleAccessor accessor, int fieldId) {
-        int start = accessor.getTupleStartOffset() + accessor.getFieldSlotsLength()
-                + accessor.getFieldStartOffset(fieldId) + 1;
-        long intervalStart = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), start);
-        return intervalStart;
-    }
-
-    public static long getIntervalEnd(ITupleAccessor accessor, int fieldId) {
-        int start = accessor.getTupleStartOffset() + accessor.getFieldSlotsLength()
-                + accessor.getFieldStartOffset(fieldId) + 1;
-        long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), start);
-        return intervalEnd;
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
index 34c1e0e..ac30067 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
@@ -22,14 +22,11 @@ package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
 import org.apache.asterix.runtime.operators.joins.interval.utils.IIntervalJoinUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 public class IntervalSideTuple {
     // Tuple access
     int fieldId;
-    ITupleAccessor accessor;
-    int tupleIndex;
-    int frameIndex = -1;
+    ITupleCursor cursor;
 
     long start;
     long end;
@@ -37,36 +34,24 @@ public class IntervalSideTuple {
     // Join details
     final IIntervalJoinUtil imjc;
 
-    public IntervalSideTuple(IIntervalJoinUtil imjc, ITupleAccessor accessor, int fieldId) {
+    public IntervalSideTuple(IIntervalJoinUtil imjc, ITupleCursor cursor, int fieldId) {
         this.imjc = imjc;
-        this.accessor = accessor;
+        this.cursor = cursor;
         this.fieldId = fieldId;
     }
 
-    public void setTuple(TuplePointer tp) {
-        if (frameIndex != tp.getFrameIndex()) {
-            accessor.reset(tp);
-            frameIndex = tp.getFrameIndex();
-        }
-        tupleIndex = tp.getTupleIndex();
-        int offset = IntervalJoinUtil.getIntervalOffset(accessor, tupleIndex, fieldId);
-        start = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), offset);
-        end = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), offset);
-    }
-
     public void loadTuple() {
-        tupleIndex = accessor.getTupleId();
-        int offset = IntervalJoinUtil.getIntervalOffset(accessor, tupleIndex, fieldId);
-        start = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), offset);
-        end = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), offset);
+        int offset = IntervalJoinUtil.getIntervalOffset(cursor.getAccessor(), cursor.getTupleId(), fieldId);
+        start = AIntervalSerializerDeserializer.getIntervalStart(cursor.getAccessor().getBuffer().array(), offset);
+        end = AIntervalSerializerDeserializer.getIntervalEnd(cursor.getAccessor().getBuffer().array(), offset);
     }
 
     public int getTupleIndex() {
-        return tupleIndex;
+        return cursor.getTupleId();
     }
 
-    public ITupleAccessor getAccessor() {
-        return accessor;
+    public ITupleCursor getCursor() {
+        return cursor;
     }
 
     public long getStart() {
@@ -78,14 +63,17 @@ public class IntervalSideTuple {
     }
 
     public boolean compareJoin(IntervalSideTuple ist) throws HyracksDataException {
-        return imjc.checkToSaveInResult(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+        return imjc.checkToSaveInResult(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+                ist.cursor.getTupleId());
     }
 
     public boolean removeFromMemory(IntervalSideTuple ist) {
-        return imjc.checkToRemoveInMemory(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+        return imjc.checkToRemoveInMemory(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+                ist.cursor.getTupleId());
     }
 
     public boolean checkForEarlyExit(IntervalSideTuple ist) {
-        return imjc.checkForEarlyExit(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+        return imjc.checkForEarlyExit(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+                ist.cursor.getTupleId());
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java
deleted file mode 100644
index aebb932..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
-import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
-import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class IntervalVariableDeletableTupleMemoryManager extends VariableDeletableTupleMemoryManager {
-    public IntervalVariableDeletableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
-        super(framePool, recordDescriptor);
-    }
-
-    public ITupleAccessor createTupleAccessor() {
-        return new AbstractTupleAccessor() {
-            private IAppendDeletableFrameTupleAccessor bufferAccessor =
-                    new DeletableFrameTupleAppender(recordDescriptor);
-
-            @Override
-            protected IFrameTupleAccessor getInnerAccessor() {
-                return bufferAccessor;
-            }
-
-            protected void resetInnerAccessor(TuplePointer tuplePointer) {
-                bufferAccessor.reset(frames.get(tuplePointer.getFrameIndex()));
-            }
-
-            @Override
-            protected void resetInnerAccessor(int frameIndex) {
-                bufferAccessor.reset(frames.get(frameIndex));
-            }
-
-            @Override
-            protected int getFrameCount() {
-                return frames.size();
-            }
-
-            @Override
-            public boolean hasNext() {
-                return hasNext(frameId, tupleId);
-            }
-
-            @Override
-            public void next() {
-                tupleId = nextTuple(frameId, tupleId);
-                if (tupleId > INITIALIZED) {
-                    return;
-                }
-
-                if (frameId + 1 < getFrameCount()) {
-                    ++frameId;
-                    resetInnerAccessor(frameId);
-                    tupleId = INITIALIZED;
-                    next();
-                }
-            }
-
-            public boolean hasNext(int fId, int tId) {
-                int id = nextTuple(fId, tId);
-                if (id > INITIALIZED) {
-                    return true;
-                }
-                if (fId + 1 < getFrameCount()) {
-                    return hasNext(fId + 1, INITIALIZED);
-                }
-                return false;
-            }
-
-            public int nextTuple(int fId, int tId) {
-                if (fId != frameId) {
-                    resetInnerAccessor(fId);
-                }
-                int id = nextTupleInFrame(tId);
-                if (fId != frameId) {
-                    resetInnerAccessor(frameId);
-                }
-                return id;
-            }
-
-            public int nextTupleInFrame(int tId) {
-                int id = tId;
-                while (id + 1 < getTupleCount()) {
-                    ++id;
-                    if (getTupleEndOffset(id) > 0) {
-                        return id;
-                    }
-                }
-                return UNSET;
-            }
-        };
-    }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
index fafe12e..07b8a00 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
@@ -105,25 +105,29 @@ public class RunFileStream {
         runFileBuffer.reset();
     }
 
-    public void addToRunFile(ITupleAccessor accessor) throws HyracksDataException {
-        int idx = accessor.getTupleId();
-        addToRunFile(accessor, idx);
+    public void addToRunFile(IFrameTupleAccessor accessor, int tupleId) throws HyracksDataException {
+        if (!runFileAppender.append(accessor, tupleId)) {
+            runFileAppender.write(runFileWriter, true);
+            writeCount++;
+            runFileAppender.append(accessor, tupleId);
+        }
+        totalTupleCount++;
     }
 
-    public void addToRunFile(IFrameTupleAccessor accessor, int idx) throws HyracksDataException {
-        if (!runFileAppender.append(accessor, idx)) {
+    public void addToRunFile(FrameTupleCursor cursor) throws HyracksDataException {
+        if (!runFileAppender.append(cursor.getAccessor(), cursor.getTupleId())) {
             runFileAppender.write(runFileWriter, true);
             writeCount++;
-            runFileAppender.append(accessor, idx);
+            runFileAppender.append(cursor.getAccessor(), cursor.getTupleId());
         }
         totalTupleCount++;
     }
 
-    public void startReadingRunFile(ITupleAccessor accessor) throws HyracksDataException {
-        startReadingRunFile(accessor, 0);
+    public void startReadingRunFile(FrameTupleCursor cursor) throws HyracksDataException {
+        startReadingRunFile(cursor, 0);
     }
 
-    public void startReadingRunFile(ITupleAccessor accessor, long startOffset) throws HyracksDataException {
+    public void startReadingRunFile(FrameTupleCursor cursor, long startOffset) throws HyracksDataException {
         if (runFileReader != null) {
             runFileReader.close();
         }
@@ -134,15 +138,14 @@ public class RunFileStream {
         runFileReader.seek(startOffset);
         previousReadPointer = 0;
         // Load first frame
-        loadNextBuffer(accessor);
+        loadNextBuffer(cursor);
     }
 
-    public boolean loadNextBuffer(ITupleAccessor accessor) throws HyracksDataException {
+    public boolean loadNextBuffer(FrameTupleCursor cursor) throws HyracksDataException {
         final long tempFrame = runFileReader.position();
         if (runFileReader.nextFrame(runFileBuffer)) {
             previousReadPointer = tempFrame;
-            accessor.reset(runFileBuffer.getBuffer());
-            accessor.next();
+            cursor.reset(runFileBuffer.getBuffer());
             readCount++;
             return true;
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java
deleted file mode 100644
index b97df37..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class TupleAccessor extends FrameTupleAccessor implements ITupleAccessor {
-    public static final int UNSET = -2;
-    public static final int INITIALIZED = -1;
-    private int tupleId = UNSET;
-
-    public TupleAccessor(RecordDescriptor recordDescriptor) {
-        super(recordDescriptor);
-    }
-
-    @Override
-    public void reset(ByteBuffer buffer) {
-        reset(buffer, 0, buffer.limit());
-        tupleId = INITIALIZED;
-    }
-
-    public void reset(TuplePointer tp) {
-        throw new IllegalAccessError("Should never call this reset");
-    }
-
-    @Override
-    public int getTupleStartOffset() {
-        return getTupleStartOffset(tupleId);
-    }
-
-    @Override
-    public int getTupleEndOffset() {
-        return getTupleStartOffset(tupleId);
-    }
-
-    @Override
-    public int getTupleLength() {
-        return getTupleLength(tupleId);
-    }
-
-    @Override
-    public int getAbsFieldStartOffset(int fieldId) {
-        return getAbsoluteFieldStartOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public int getFieldLength(int fieldId) {
-        return getFieldLength(tupleId, fieldId);
-    }
-
-    @Override
-    public int getFieldEndOffset(int fieldId) {
-        return getFieldEndOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public int getFieldStartOffset(int fieldId) {
-        return getFieldStartOffset(tupleId, fieldId);
-    }
-
-    @Override
-    public int getTupleId() {
-        return tupleId;
-    }
-
-    @Override
-    public void getTuplePointer(TuplePointer tp) {
-        tp.reset(INITIALIZED, tupleId);
-    }
-
-    @Override
-    public void setTupleId(int tupleId) {
-        this.tupleId = tupleId;
-    }
-
-    @Override
-    public void reset() {
-        tupleId = INITIALIZED;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (tupleId == UNSET) {
-            return false;
-        }
-        return tupleId + 1 < getTupleCount();
-    }
-
-    @Override
-    public void next() {
-        ++tupleId;
-    }
-
-    @Override
-    public boolean exists() {
-        return INITIALIZED < tupleId && tupleId < getTupleCount();
-    }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java
similarity index 53%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java
index 77059d6..b7fd6f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java
@@ -16,54 +16,46 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
 
-import java.nio.ByteBuffer;
+import java.util.Iterator;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
-public interface ITupleAccessor extends IFrameTupleAccessor {
-    int getTupleStartOffset();
-
-    int getTupleEndOffset();
+public class TuplePointerCursor extends AbstractTupleCursor<Iterator<TuplePointer>> {
 
-    int getTupleLength();
+    Iterator<TuplePointer> iterator;
+    TuplePointer tp;
 
-    int getAbsFieldStartOffset(int fieldId);
-
-    int getFieldLength(int fieldId);
+    public TuplePointerCursor(ITuplePointerAccessor accessor) {
+        this.accessor = accessor;
+    }
 
     @Override
-    int getFieldCount();
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
 
     @Override
-    int getFieldSlotsLength();
-
-    int getFieldEndOffset(int fieldId);
-
-    int getFieldStartOffset(int fieldId);
-
-    void reset(TuplePointer tuplePointer);
+    public void next() {
+        TuplePointer tp = iterator.next();
+        this.tp = tp;
+        tupleId = tp.getTupleIndex();
+        ((ITuplePointerAccessor) accessor).reset(tp);
+    }
 
     @Override
-    void reset(ByteBuffer buffer);
-
-    int getTupleId();
-
-    void setTupleId(int tupleId);
-
-    void getTuplePointer(TuplePointer tp);
-
-    /**
-     * Only reset the iterator.
-     */
-    void reset();
-
-    boolean hasNext();
-
-    void next();
-
-    boolean exists();
+    public void reset(Iterator<TuplePointer> iterator) {
+        this.iterator = iterator;
+        tupleId = INITIALIZED;
+    }
+
+    public void remove() {
+        iterator.remove();
+    }
+
+    public TuplePointer getTuplePointer() {
+        return tp;
+    }
 }