You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:34 UTC

[44/50] [abbrv] asterixdb git commit: new partition join algorithm

new partition join algorithm


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0b900514
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0b900514
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0b900514

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 0b90051413c96ae52cc3136d41b5589275f11385
Parents: b34a426
Author: Preston Carman <pr...@apache.org>
Authored: Fri Sep 30 13:56:13 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Fri Sep 30 13:56:13 2016 -0700

----------------------------------------------------------------------
 .../IntervalPartitionJoinPOperator.java         |   2 +-
 .../InMemoryIntervalPartitionJoin.java          |  98 ---
 ...IntervalPartitionJoinOperatorDescriptor.java | 355 +++++----
 .../IntervalPartitionJoinTaskState.java         |  33 +
 .../IntervalPartitionJoiner.java                | 772 ++++---------------
 .../IntervalPartitionUtil.java                  |  70 --
 ...IntervalPartitionJoinOperatorDescriptor.java | 319 --------
 .../IntervalPartitionJoinTaskState.java         |  33 -
 .../IntervalPartitionJoiner.java                | 288 -------
 9 files changed, 412 insertions(+), 1558 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index af77a92..73d159e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.logging.Logger;
 
 import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition2.IntervalPartitionJoinOperatorDescriptor;
+import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
deleted file mode 100644
index aeea209..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
-import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-
-public class InMemoryIntervalPartitionJoin {
-
-    private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
-
-    private final FrameTupleAccessor accessorBuild;
-    private final FrameTupleAppender appender;
-    private final IFrameBufferManager fbm;
-    private BufferInfo bufferInfo;
-    private final IIntervalMergeJoinChecker imjc;
-
-    private long joinComparisonCount = 0;
-    private long joinResultCount = 0;
-
-    public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm,
-            IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd)
-            throws HyracksDataException {
-        bufferInfo = new BufferInfo(null, -1, -1);
-        this.accessorBuild = new FrameTupleAccessor(buildRd);
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        this.imjc = imjc;
-        this.fbm = fbm;
-        LOGGER.fine(
-                "InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + ".");
-    }
-
-    public long getComparisonCount() {
-        return joinComparisonCount;
-    }
-
-    public long getResultCount() {
-        return joinResultCount;
-    }
-
-    public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer)
-            throws HyracksDataException {
-        if (fbm.getNumFrames() != 0) {
-            fbm.resetIterator();
-            int frameIndex = fbm.next();
-            while (fbm.exists()) {
-                fbm.getFrame(frameIndex, bufferInfo);
-                accessorBuild.reset(bufferInfo.getBuffer());
-                for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
-                    if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex,
-                            false)) {
-                        appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
-                    }
-                    joinComparisonCount++;
-                }
-                frameIndex = fbm.next();
-            }
-        }
-    }
-
-    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
-        appender.write(writer, true);
-    }
-
-    private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
-            int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
-        joinResultCount++;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index b4965ef..ddbe913 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.runtime.operators.joins.intervalpartition;
 
 import java.nio.ByteBuffer;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
@@ -36,34 +35,29 @@ import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
 import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
 
 public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
-    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
-    private static final String PROBE_REL = "RelR";
-    private static final String BUILD_REL = "RelS";
-
-    private final int memsize;
-    private final int[] probeKeys;
-    private final int[] buildKeys;
-
+    private static final int LEFT_ACTIVITY_ID = 0;
+    private static final int RIGHT_ACTIVITY_ID = 1;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final int memoryForJoin;
+    private final IIntervalMergeJoinCheckerFactory imjcf;
+    private final RangeId rangeId;
     private final int k;
 
     private final int probeKey;
     private final int buildKey;
-    private final IIntervalMergeJoinCheckerFactory imjcf;
-    private final RangeId rangeId;
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
 
@@ -71,180 +65,253 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
             int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
             RangeId rangeId) {
         super(spec, 2, 1);
-        this.memsize = memoryForJoin;
+        recordDescriptors[0] = recordDescriptor;
         this.buildKey = leftKeys[0];
         this.probeKey = rightKeys[0];
         this.k = k;
-        this.buildKeys = leftKeys;
-        this.probeKeys = rightKeys;
-        recordDescriptors[0] = recordDescriptor;
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.memoryForJoin = memoryForJoin;
         this.imjcf = imjcf;
         this.rangeId = rangeId;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
-        ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
-        IActivity phase1 = new PartitionAndBuildActivityNode(p1Aid, p2Aid);
-        IActivity phase2 = new ProbeAndJoinActivityNode(p2Aid, p1Aid);
+        MergeJoinLocks locks = new MergeJoinLocks();
 
-        builder.addActivity(this, phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
+        ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
 
-        builder.addActivity(this, phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks);
+        IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks);
 
-        builder.addBlockingEdge(phase1, phase2);
+        builder.addActivity(this, rightAN);
+        builder.addSourceEdge(1, rightAN, 0);
 
-        builder.addTargetEdge(0, phase2, 0);
+        builder.addActivity(this, leftAN);
+        builder.addSourceEdge(0, leftAN, 0);
+        builder.addTargetEdge(0, leftAN, 0);
     }
 
-    public static class BuildAndPartitionTaskState extends AbstractStateObject {
-        private IntervalPartitionJoiner ipj;
-        private int intervalPartitions;
-        private int partition;
-        private int k;
-        private int memoryForJoin;
-
-        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-    }
-
-    private class PartitionAndBuildActivityNode extends AbstractActivityNode {
+    private class LeftJoinerActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        private final ActivityId probeAid;
+        private final MergeJoinLocks locks;
 
-        public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) {
+        public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
             super(id);
-            this.probeAid = probeAid;
+            this.locks = locks;
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-
-            final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-
-            return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
-                        ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
-                private boolean failure = false;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    if (memsize <= 2) {
-                        // Dedicated buffers: One buffer to read and one buffer for output
-                        failure = true;
-                        throw new HyracksDataException("not enough memory for join");
-                    }
-                    state.k = k;
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new LeftJoinerOperator(ctx, partition, inRecordDesc);
+        }
 
-                    RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
-                    long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
-                            partition);
-                    long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
-                    ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
-                            partitionStart, partitionEnd).createPartitioner();
-                    ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
-                            partitionStart, partitionEnd).createPartitioner();
+        private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
-                    state.partition = partition;
-                    state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
-                    state.memoryForJoin = memsize;
-                    IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx);
-                    state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
-                            BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
-
-                    state.ipj.buildInit();
-                    LOGGER.setLevel(Level.FINE);
-                    System.out
-                            .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
-                                + " granules repesenting " + state.intervalPartitions + " interval partitions using "
-                                + state.memoryForJoin + " frames for memory.");
-                    }
-                }
+            private final IHyracksTaskContext ctx;
+            private final int partition;
+            private final RecordDescriptor leftRd;
+            private IntervalPartitionJoinTaskState state;
+            private boolean first = true;
 
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.ipj.buildStep(buffer);
-                }
+            public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.leftRd = inRecordDesc;
+            }
 
-                @Override
-                public void close() throws HyracksDataException {
-                    if (!failure) {
-                        state.ipj.buildClose();
-                        ctx.setStateObject(state);
-                        if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine("IntervalPartitionJoin closed its build phase");
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    writer.open();
+                    state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition));;
+                    state.leftRd = leftRd;
+                    ctx.setStateObject(state);
+                    locks.getRight(partition).signal();
+
+                    do {
+                        // Continue after joiner created in right branch.
+                        if (state.partitionJoiner == null) {
+                            locks.getLeft(partition).await();
                         }
-                    }
+                    } while (state.partitionJoiner == null);
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
+                    locks.getRight(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
                 }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    failure = true;
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                locks.getLock(partition).lock();
+                if (first) {
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageData();
+                    first = false;
                 }
-
-            };
-
+                try {
+                    state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
+                    state.partitionJoiner.processLeftFrame(writer);
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[LEFT_ACTIVITY_ID].noMore();
+                    if (state.failed) {
+                        writer.fail();
+                    } else {
+                        state.partitionJoiner.processLeftClose(writer);
+                        writer.close();
+                    }
+                    state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
+                    locks.getRight(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
         }
     }
 
-    private class ProbeAndJoinActivityNode extends AbstractActivityNode {
+    private class RightDataActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
-        public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
+        private final ActivityId joinAid;
+        private final MergeJoinLocks locks;
+
+        public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
             super(id);
+            this.joinAid = joinAid;
+            this.locks = locks;
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
                 throws HyracksDataException {
+            locks.setPartitions(nPartitions);
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+            return new RightDataOperator(ctx, partition, inRecordDesc);
+        }
 
-            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private BuildAndPartitionTaskState state;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
-                    writer.open();
-                    state.ipj.probeInit();
-
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("IntervalPartitionJoin is starting the probe phase.");
-                    }
-                }
+        private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable {
+
+            private int partition;
+            private IHyracksTaskContext ctx;
+            private final RecordDescriptor rightRd;
+            private IntervalPartitionJoinTaskState state;
+            private boolean first = true;
+
+            public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
+                this.ctx = ctx;
+                this.partition = partition;
+                this.rightRd = inRecordDesc;
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    do {
+                        // Wait for the state to be set in the context form Left.
+                        state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition));
+                        if (state == null) {
+                            locks.getRight(partition).await();
+                        }
+                    } while (state == null);
+                    state.k = k;
 
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.ipj.probeStep(buffer, writer);
+                    RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
+                    long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
+                            partition);
+                    long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
+                    ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
+                    ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
+                    IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
+
+                    state.rightRd = rightRd;
+                    state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k,
+                            state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc);
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
                 }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    writer.fail();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                locks.getLock(partition).lock();
+                if (first) {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
+                    first = false;
                 }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    state.ipj.probeClose(writer);
-                    state.ipj.joinSpilledPartitions(writer);
-                    state.ipj.closeAndDeleteRunFiles();
-                    writer.close();
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine("IntervalPartitionJoin closed its probe phase");
+                try {
+                    while (!state.status.continueRightLoad
+                            && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
+                        // Wait for the state to request right frame unless left has finished.
+                        locks.getRight(partition).await();
                     }
+                    state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+                    state.status.continueRightLoad = false;
+                    locks.getLeft(partition).signal();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.failed = true;
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                locks.getLock(partition).lock();
+                try {
+                    state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
+                    locks.getLeft(partition).signal();
+                } finally {
+                    locks.getLock(partition).unlock();
                 }
-            };
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
new file mode 100644
index 0000000..1939899
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition;
+
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState;
+
+public class IntervalPartitionJoinTaskState extends MergeJoinTaskState {
+    protected IntervalPartitionJoiner partitionJoiner;
+    public int k;
+
+    public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) {
+        super(jobId, taskId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 9c5a872..984db20 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -18,21 +18,17 @@
  */
 package org.apache.asterix.runtime.operators.joins.intervalpartition;
 
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -41,685 +37,251 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
+import org.apache.hyracks.dataflow.std.join.MergeStatus;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
-/**
- * The Interval Partition Join runs in three stages: build, probe-in-memory, probe-spill.
- * build: Saves all build partitions either to memory or disk.
- * probe-in-memory: Joins all in memory partitions and saves the necessary partitions to disk.
- * probe-spill: Spilled build partitions are loaded into memory and joined with all probe remaining partitions.
- */
-public class IntervalPartitionJoiner {
+public class IntervalPartitionJoiner extends AbstractMergeJoiner {
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
 
-    private enum SIDE {
-        BUILD,
-        PROBE
-    }
-
-    private IHyracksTaskContext ctx;
-
-    private final String buildRelName;
-    private final String probeRelName;
+    private final RunFileWriter probeRunFileWriter;
+    private int probeRunFilePid = -1;
 
     private final ITuplePartitionComputer buildHpc;
     private final ITuplePartitionComputer probeHpc;
 
-    private final RecordDescriptor buildRd;
-    private final RecordDescriptor probeRd;
-
-    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
-    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
     private final int buildMemory;
     private final int k;
     private final int numOfPartitions;
-    private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
-
-    private VPartitionTupleBufferManager buildBufferManager;
+    private long buildSize = 0;
+    private long probeSize = 0;
+    private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
 
+    private final VPartitionTupleBufferManager buildBufferManager;
+    private final TuplePointer tempPtr = new TuplePointer();
+    private final List<Integer> buildInMemoryPartitions;
     private final FrameTupleAccessor accessorBuild;
-    private final FrameTupleAccessor accessorProbe;
-
-    // stats information
-    private IntervalPartitionJoinData ipjd;
-
-    private IFrame reloadBuffer;
-    private TuplePointer tempPtr = new TuplePointer();
-
-    private IIntervalMergeJoinChecker imjc;
+    private BufferInfo bufferInfo;
 
+    private long spillWriteCount = 0;
+    private long spillReadCount = 0;
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
-    private long spillReadCount = 0;
-    private long spillWriteCount = 0;
-    private long buildSize;
-    private long probeSize;
-    private int tmp = -1;
-
-    private RunFileWriter probeRunFileWriter = null;
-    private final IFrameTupleAppender probeRunFileAppender;
-    private int probeRunFilePid = -1;
-
-    public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
-            String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc)
-            throws HyracksDataException {
-        this.ctx = ctx;
-        // TODO fix available memory size
-        this.buildMemory = memForJoin;
-        this.k = k;
-        this.buildRd = buildRd;
-        this.probeRd = probeRd;
-        this.buildHpc = buildHpc;
-        this.probeHpc = probeHpc;
-        this.imjc = imjc;
-        this.buildRelName = buildRelName;
-        this.probeRelName = probeRelName;
+    private final IIntervalMergeJoinChecker imjc;
+    private final FrameTupleAccessor accessorProbe;
+    private final IFrame reloadBuffer;
+    private boolean moreBuildProcessing = true;
+    private final List<IFrameBufferManager> fbms = new ArrayList<>();
 
-        this.numOfPartitions = numOfPartitions;
-        this.buildRFWriters = new RunFileWriter[numOfPartitions];
-        this.probeRFWriters = new RunFileWriter[numOfPartitions];
-        this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
+    public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status,
+            MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd,
+            ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException {
+        super(ctx, partition, status, locks, leftRd, rightRd);
 
-        this.accessorBuild = new FrameTupleAccessor(buildRd);
-        this.accessorProbe = new FrameTupleAccessor(probeRd);
+        bufferInfo = new BufferInfo(null, -1, -1);
 
+        this.accessorProbe = new FrameTupleAccessor(leftRd);
         reloadBuffer = new VSizeFrame(ctx);
-        probeRunFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
-        ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
-    }
 
-    public void buildInit() throws HyracksDataException {
+        this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
+        this.imjc = imjc;
+
+        // TODO fix available memory size
+        this.buildMemory = memorySize;
         buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
                 numOfPartitions, buildMemory * ctx.getInitialFrameSize());
-        System.err.println("k: " + k);
-        buildSize = 0;
-    }
 
-    public void buildStep(ByteBuffer buffer) throws HyracksDataException {
-        accessorBuild.reset(buffer);
-        int tupleCount = accessorBuild.getTupleCount();
+        this.k = k;
+        this.buildHpc = buildHpc;
+        this.probeHpc = probeHpc;
 
-        int pid;
-        for (int i = 0; i < tupleCount; ++i) {
-            pid = buildHpc.partition(accessorBuild, i, k);
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
+        probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        probeRunFileWriter.open();
 
-            if (tmp != pid) {
-                System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: "
-                        + IntervalPartitionUtil.getIntervalPartition(pid, k));
-                tmp = pid;
-            }
-            processTuple(i, pid);
-            ipjd.buildIncrementCount(pid);
-            buildSize++;
-        }
-    }
+        probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
+        buildInMemoryPartitions = new LinkedList<>();
 
-    public void buildClose() throws HyracksDataException {
-        System.err.println("buildSize: " + buildSize);
-
-        int inMemoryPartitions = 0;
-        int totalBuildPartitions = 0;
-        flushAndClearBuildSpilledPartition();
-
-        // Trying to bring back as many spilled partitions as possible, making them resident
-        bringBackSpilledPartitionIfHasMoreMemory(false);
-
-        // Update build partition join map based on partitions with actual data.
-        for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
-            if (ipjd.buildGetCount(i) == 0) {
-                ipjd.buildRemoveFromJoin(i);
-            } else if (ipjd.buildGetCount(i) > 0) {
-                // Set up build memory for processing joins for partitions in memory.
-                createInMemoryJoiner(i);
-                inMemoryPartitions++;
-                totalBuildPartitions += ipjd.buildGetCount(i);
-            }
-        }
+        this.accessorBuild = new FrameTupleAccessor(rightRd);
 
+        LOGGER.setLevel(Level.FINE);
+        System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
         if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
-                    + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
-                    + ipjd.buildGetSpilledCount());
+            LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
+                    + " frames of memory.");
         }
     }
 
-    private void processTuple(int tid, int pid) throws HyracksDataException {
-        while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            int victimPartition = selectPartitionToSpill();
-            if (victimPartition < 0) {
-                throw new HyracksDataException(
-                        "No more space left in the memory buffer, please give join more memory budgets.");
-            }
-            spillPartition(victimPartition);
-        }
-    }
-
-    private int selectPartitionToSpill() {
-        int partitionToSpill = selectLargestSpilledPartition();
-        int maxToSpillPartSize = 0;
-        if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
-                .getInitialFrameSize()) {
-            int partitionInMem = selectNextInMemoryPartitionToSpill();
-            if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
-                partitionToSpill = partitionInMem;
+    @Override
+    public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
+        while (inputAccessor[LEFT_PARTITION].exists()) {
+            int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k);
+
+            if (probeRunFilePid != pid) {
+                // Log new partition locations.
+                RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(),
+                        inputAccessor[LEFT_PARTITION].getTupleId());
+                probeRunFilePointers.put(rfp, pid);
+                probeRunFilePid = pid;
             }
+            inputAccessor[LEFT_PARTITION].next();
+            probeSize++;
         }
-        return partitionToSpill;
+        inputBuffer[LEFT_PARTITION].rewind();
+        probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
+        spillWriteCount++;
     }
 
-    /**
-     * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
-     *
-     * @return
-     */
-    private int selectNextInMemoryPartitionToSpill() {
-        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
-            if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
-                return i;
-            }
-        }
-        return -1;
-    }
+    @Override
+    public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+        joinLoopOnMemory(writer);
 
-    private int selectLargestSpilledPartition() {
-        int pid = -1;
-        int max = 0;
-        for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
-            int partSize = buildBufferManager.getPhysicalSize(i);
-            if (partSize > max) {
-                max = partSize;
-                pid = i;
-            }
+        // Flush result.
+        resultAppender.write(writer, true);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
+                    + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
         }
-        return pid;
     }
 
-    private void spillPartition(int pid) throws HyracksDataException {
-        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
-        spillWriteCount += buildBufferManager.getNumFrames(pid);
-        buildBufferManager.flushPartition(pid, writer);
-        buildBufferManager.clearPartition(pid);
-        ipjd.buildSpill(pid);
-    }
+    private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
+        RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
+        pReader.open();
+        // Load first frame.
+        loadReaderNextFrame(pReader);
 
-    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        String refName = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                refName = buildRelName;
-                break;
-            case PROBE:
-                refName = probeRelName;
-                runFileWriters = probeRFWriters;
-                break;
-            default:
-        }
-        RunFileWriter writer = runFileWriters[pid];
-        if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIOManager());
-            writer.open();
-            runFileWriters[pid] = writer;
-        }
-        return writer;
-    }
+        while (moreBuildProcessing) {
+            fillMemory();
+            joinMemoryBlockWithRunFile(writer, pReader);
 
-    public void clearBuildMemory() throws HyracksDataException {
-        for (int pid = 0; pid < numOfPartitions; ++pid) {
-            if (buildBufferManager.getNumTuples(pid) > 0) {
+            // Clean up
+            for (int pid : buildInMemoryPartitions) {
                 buildBufferManager.clearPartition(pid);
             }
+            buildInMemoryPartitions.clear();
         }
-        ipjd.buildClearMemory();
-    }
-
-    private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
-        for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
-            if (buildBufferManager.getNumTuples(pid) > 0) {
-                spillWriteCount += buildBufferManager.getNumFrames(pid);
-                RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
-                buildBufferManager.flushPartition(pid, runFileWriter);
-                buildBufferManager.clearPartition(pid);
-                buildRFWriters[pid].close();
-            }
-        }
-    }
-
-    private void flushProbeSpilledPartition() throws HyracksDataException {
-        if (probeRunFileWriter != null) {
-            // flush previous runFile
-            probeRunFileAppender.write(probeRunFileWriter, true);
-            probeRunFileWriter.close();
-            spillWriteCount++;
-        }
-    }
-
-    private void bringBackSpilledPartitionIfHasMoreMemory(boolean partitalLoad) throws HyracksDataException {
-        int freeFrames = buildMemory;
-        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
-            freeFrames -= buildBufferManager.getNumFrames(i);
-        }
-
-        int pid = 0;
-        while ((pid = selectPartitionsToReload(freeFrames, pid, partitalLoad)) >= 0 && freeFrames > 0) {
-            if (pid == 225) {
-                int i = 0;
-            }
-            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
-                return;
-            }
-            freeFrames -= buildBufferManager.getNumFrames(pid);
-        }
+        pReader.close();
     }
 
-    int buildParitialLoadPid = -1;
-    int buildParitialNextTid = -1;
-    long buildParitialResetReader = -1;
-
-    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
-        if (pid == 225) {
-            int i = 0;
-        }
-        RunFileReader r = wr.createReader();
-        r.open();
-        if (buildParitialLoadPid == pid && buildParitialResetReader > 0) {
-            r.reset(buildParitialResetReader);
-        }
-        int framesLoaded = 0;
-        while (r.nextFrame(reloadBuffer)) {
-            framesLoaded++;
-            accessorBuild.reset(reloadBuffer.getBuffer());
-            spillReadCount++;
-            for (int tid = buildParitialNextTid > 0 ? buildParitialNextTid : 0; tid < accessorBuild
-                    .getTupleCount(); tid++) {
-                if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                    // for some reason (e.g. due to fragmentation) if the inserting failed
-                    // we need to start this partition from this location on the next round.
-                    buildParitialLoadPid = pid;
-                    buildParitialNextTid = tid;
-                    buildParitialResetReader = r.getReadPointer();
-                    ipjd.buildLoad(pid);
-                    createInMemoryJoiner(pid);
-                    r.close();
-                    return false;
+    private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException {
+        // Join Disk partitions with Memory partitions
+        for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
+            Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
+                    k);
+            for (int buildId : buildInMemoryPartitions) {
+                Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
+                if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
+                    fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
                 }
             }
-        }
-        if (framesLoaded == 0) {
-            int t = 0;
-        }
-
-        ipjd.buildLoad(pid);
-        createInMemoryJoiner(pid);
-        r.close();
-        buildRFWriters[pid] = null;
-        buildParitialLoadPid = -1;
-        buildParitialNextTid = -1;
-        buildParitialResetReader = -1;
-        return true;
-    }
-
-    private int selectPartitionsToReload(int freeFrames, int pid, boolean partitalLoad) {
-        int freeSpace = freeFrames * ctx.getInitialFrameSize();
-        if (freeSpace > 0 && buildParitialLoadPid > 0 && buildParitialResetReader > 0) {
-            return buildParitialLoadPid;
-        }
-        for (int id = ipjd.buildNextSpilled(pid); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
-            assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?";
-            if (partitalLoad || freeSpace >= buildRFWriters[id].getFileSize()) {
-                return id;
+            if (!fbms.isEmpty()) {
+                join(pReader, probeId, fbms, writer);
             }
+            fbms.clear();
         }
-        return -1;
     }
 
-    private void createInMemoryJoiner(int pid) throws HyracksDataException {
-        inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
-                buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
-    }
-
-    private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
-        joinComparisonCount += inMemJoiner[pid].getComparisonCount();
-        joinResultCount += inMemJoiner[pid].getResultCount();
-        inMemJoiner[pid].closeJoin(writer);
-        inMemJoiner[pid] = null;
-    }
-
-    public void probeInit() throws HyracksDataException {
-        probeRFWriters = new RunFileWriter[numOfPartitions];
-        probeSize = 0;
-    }
-
-    public void probeStep(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessorProbe.reset(buffer);
-        int tupleCount = accessorProbe.getTupleCount();
+    private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms,
+            IFrameWriter writer) throws HyracksDataException {
+        long fileOffsetStart = rfpStart.getFileOffset();
+        int tupleStart = rfpStart.getTupleIndex();
 
-        for (int i = 0; i < tupleCount; ++i) {
-            int pid = probeHpc.partition(accessorProbe, i, k);
+        RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
+        long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset();
+        int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex();
 
-            if (tmp != pid) {
-                System.err.println("probeSize: " + probeSize + " pid: " + pid + " k: " + k + " pair: "
-                        + IntervalPartitionUtil.getIntervalPartition(pid, k));
-                tmp = pid;
-            }
-
-            if (!ipjd.hasProbeJoinMap(pid)) {
-                // Set probe join map
-                ipjd.setProbeJoinMap(pid,
-                        IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
-            }
-
-            // Tuple has potential match from build phase
-            if (!ipjd.isProbeJoinMapEmpty(pid)) {
-                if (ipjd.probeHasSpilled(pid)) {
-                    // pid is Spilled
-                    probeSpillTuple(accessorProbe, i, pid);
-                }
-                for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
-                    // pid has join partitions that are Resident
-                    int j = pidIterator.next();
-                    if (inMemJoiner[j] != null) {
-                        inMemJoiner[j].join(accessorProbe, i, writer);
-                    }
-                }
-            }
-            ipjd.probeIncrementCount(pid);
-            probeSize++;
-        }
-    }
-
-    /**
-     * Closes the probe process.
-     * We do NOT join the spilled partitions here, use {@link joinSpilledPartitions}.
-     *
-     * @param writer
-     * @throws HyracksDataException
-     */
-    public void probeClose(IFrameWriter writer) throws HyracksDataException {
-        System.err.println("probeSize: " + probeSize);
-
-        for (int i = 0; i < inMemJoiner.length; ++i) {
-            if (inMemJoiner[i] != null) {
-                closeInMemoryJoiner(i, writer);
-                ipjd.buildLogJoined(i);
-                ipjd.buildRemoveFromJoin(i);
-            }
-        }
-        clearBuildMemory();
-        flushProbeSpilledPartition();
-    }
-
-    private void probeSpillTuple(IFrameTupleAccessor accessorProbe, int probeTupleIndex, int pid)
-            throws HyracksDataException {
-        if (pid != probeRunFilePid) {
-            flushProbeSpilledPartition();
-            probeRunFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
-            probeRunFilePid = pid;
-        }
-        if (!probeRunFileAppender.append(accessorProbe, probeTupleIndex)) {
-            probeRunFileAppender.write(probeRunFileWriter, true);
-            probeRunFileAppender.append(accessorProbe, probeTupleIndex);
-            spillWriteCount++;
+        if (pReader.getReadPointer() != fileOffsetStart) {
+            pReader.reset(fileOffsetStart);
+            loadReaderNextFrame(pReader);
         }
-    }
+        do {
+            int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0;
+            int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount();
 
-    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
-        return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
-    }
-
-    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
-        return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
-    }
-
-    public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
-        LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
-        while (ipjd.buildGetSpilledCount() > 0) {
-            // Load back spilled build partitions.
-            // TODO only load partition required for spill join. Consider both sides.
-            bringBackSpilledPartitionIfHasMoreMemory(true);
-
-            // Create in memory joiners.
-            //            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
-            //                    .buildNextInMemoryWithResults(pid + 1)) {
-            //                createInMemoryJoiner(pid);
-            //            }
-
-            probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
-
-            // Join all build partitions with disk probe partitions.
-            for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
-                if (entry.getKey() == 221) {
-                    int t = 0;
-                }
-                System.err.println(" join pid: " + entry.getKey() + " with : " + probeInMemoryJoinMap);
-
-                if (ipjd.probeGetCount(entry.getKey()) > 0 && !probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
-                    joinSpilledProbeWithBuildMemory(writer, probeInMemoryJoinMap, entry.getKey());
-                }
-            }
-
-            // Clean up build memory.
-            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
-                    .buildNextInMemoryWithResults(pid + 1)) {
-                closeInMemoryJoiner(pid, writer);
-                if (pid != buildParitialLoadPid) {
-                    ipjd.buildLogJoined(pid);
-                    ipjd.buildRemoveFromJoin(pid);
-                } else {
-                    int t = 0;
+            for (int i = start; i < end; ++i) {
+                // Tuple has potential match from build phase
+                for (IFrameBufferManager fbm : buildFbms) {
+                    joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer);
                 }
             }
-            clearBuildMemory();
-        }
+        } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader));
     }
 
-    private void joinSpilledProbeWithBuildMemory(IFrameWriter writer,
-            LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap, int probePid)
-            throws HyracksDataException {
-        RunFileReader pReader = getProbeRFReader(probePid);
-        pReader.open();
-        while (pReader.nextFrame(reloadBuffer)) {
+    private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException {
+        if (pReader.nextFrame(reloadBuffer)) {
             accessorProbe.reset(reloadBuffer.getBuffer());
             spillReadCount++;
-            for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
-                // Tuple has potential match from build phase
-                for (Integer j : probeInMemoryJoinMap.get(probePid)) {
-                    // j has join partitions that are Resident
-                    if (inMemJoiner[j] != null) {
-                        inMemJoiner[j].join(accessorProbe, i, writer);
-                    }
-                }
-            }
+            return true;
         }
-        pReader.close();
+        return false;
     }
 
-    class IntervalPartitionJoinData {
-        private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
-
-        private int[] buildPSizeInTups;
-        private int[] probePSizeInTups;
-
-        private BitSet buildJoinedCompleted; //0=waiting, 1=joined
-        private BitSet buildSpilledStatus; //0=resident, 1=spilled
-        private BitSet buildInMemoryStatus; //0=unknown, 1=resident
-        private BitSet probeSpilledStatus; //0=resident, 1=spilled
-
-        public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
-            probeJoinMap = new LinkedHashMap<>();
-
-            buildPSizeInTups = new int[numberOfPartitions];
-            probePSizeInTups = new int[numberOfPartitions];
-
-            buildJoinedCompleted = new BitSet(numberOfPartitions);
-            buildInMemoryStatus = new BitSet(numberOfPartitions);
-            buildSpilledStatus = new BitSet(numberOfPartitions);
-            probeSpilledStatus = new BitSet(numberOfPartitions);
-        }
-
-        public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
-            return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
-        }
-
-        public boolean hasProbeJoinMap(int pid) {
-            return probeJoinMap.containsKey(pid);
-        }
-
-        public boolean isProbeJoinMapEmpty(int pid) {
-            return probeJoinMap.get(pid).isEmpty();
-        }
-
-        public Iterator<Integer> getProbeJoinMap(int pid) {
-            return probeJoinMap.get(pid).iterator();
-        }
-
-        public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
-            probeJoinMap.put(new Integer(pid), map);
-            for (Integer i : map) {
-                if (buildIsSpilled(i)) {
-                    // Build join partition has spilled. Now spill the probe also.
-                    probeSpilledStatus.set(pid);
+    public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex,
+            IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException {
+        if (fbm.getNumFrames() == 0) {
+            return;
+        }
+        fbm.resetIterator();
+        int frameIndex = fbm.next();
+        while (fbm.exists()) {
+            fbm.getFrame(frameIndex, bufferInfo);
+            accessorBuild.reset(bufferInfo.getBuffer());
+            for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
+                if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
+                    appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
                 }
+                joinComparisonCount++;
             }
+            frameIndex = fbm.next();
         }
+    }
 
-        public void buildClearMemory() {
-            buildInMemoryStatus.clear();
-        }
-
-        public void buildIncrementCount(int pid) {
-            buildInMemoryStatus.set(pid);
-            buildPSizeInTups[pid]++;
-        }
-
-        public int buildGetCount(int pid) {
-            return buildPSizeInTups[pid];
-        }
-
-        public void buildLogJoined(int pid) {
-            buildSpilledStatus.clear(pid);
-            buildJoinedCompleted.set(pid);
-        }
-
-        public void buildRemoveFromJoin(int pid) {
-            buildSpilledStatus.clear(pid);
-            buildJoinedCompleted.set(pid);
-        }
-
-        public boolean buildHasBeenJoined(int pid) {
-            return buildJoinedCompleted.get(pid);
-        }
-
-        public int buildGetSpilledCount() {
-            return buildSpilledStatus.cardinality();
-        }
-
-        public void buildSpill(int pid) {
-            buildInMemoryStatus.clear(pid);
-            buildSpilledStatus.set(pid);
-        }
-
-        public void buildLoad(int pid) {
-            buildInMemoryStatus.set(pid);
-            buildSpilledStatus.clear(pid);
-        }
-
-        public boolean buildIsSpilled(int pid) {
-            return buildSpilledStatus.get(pid);
-        }
-
-        public int buildNextSpilled(int pid) {
-            return buildSpilledStatus.nextSetBit(pid);
-        }
-
-        public int buildNextInMemoryWithResults(int pid) {
-            int nextPid = buildNextInMemory(pid);
-            do {
-                if (nextPid < 0 || buildGetCount(nextPid) > 0) {
-                    return nextPid;
-                }
-                nextPid = buildNextInMemory(nextPid + 1);
-            } while (nextPid >= 0);
-            return -1;
-        }
+    private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
+            int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
+        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe,
+                probeSidetIx);
+        joinResultCount++;
+    }
 
-        public int buildNextInMemory(int pid) {
-            int nextPid = buildSpilledStatus.nextClearBit(pid);
-            if (nextPid >= numOfPartitions) {
-                return -1;
+    private void fillMemory() throws HyracksDataException {
+        int buildPid = -1;
+        TupleStatus ts;
+        for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
+            int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(),
+                    k);
+            if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION],
+                    inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
+                return;
             }
-            do {
-                if (!buildHasBeenJoined(nextPid)) {
-                    return nextPid;
-                }
-                nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
-            } while (nextPid >= 0 && nextPid < numOfPartitions);
-            return -1;
-        }
-
-        public void probeIncrementCount(int pid) {
-            probePSizeInTups[pid]++;
-        }
-
-        public int probeGetCount(int pid) {
-            return probePSizeInTups[pid];
-        }
-
-        public void probeSpill(int pid) {
-            probeSpilledStatus.set(pid);
-        }
 
-        public boolean probeHasSpilled(int pid) {
-            return probeSpilledStatus.get(pid);
-        }
-
-        public int buildGetMaxPartitionSize() {
-            int max = buildPSizeInTups[0];
-            for (int i = 1; i < buildPSizeInTups.length; i++) {
-                if (buildPSizeInTups[i] > max) {
-                    max = buildPSizeInTups[i];
-                }
+            if (buildPid != pid) {
+                // Track new partitions in memory.
+                buildInMemoryPartitions.add(pid);
+                buildPid = pid;
             }
-            return max;
+            inputAccessor[RIGHT_PARTITION].next();
+            buildSize++;
         }
-
-        public int probeGetMaxPartitionSize() {
-            int max = probePSizeInTups[0];
-            for (int i = 1; i < probePSizeInTups.length; i++) {
-                if (probePSizeInTups[i] > max) {
-                    max = probePSizeInTups[i];
-                }
-            }
-            return max;
+        if (ts.isEmpty()) {
+            moreBuildProcessing = false;
         }
-
     }
 
-    public void closeAndDeleteRunFiles() throws HyracksDataException {
-        for (RunFileWriter rfw : buildRFWriters) {
-            if (rfw != null) {
-                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
-            }
-        }
-        for (RunFileWriter rfw : probeRFWriters) {
-            if (rfw != null) {
-                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
-            }
-        }
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, "
-                    + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
+    private TupleStatus loadRightTuple() throws HyracksDataException {
+        TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
+        if (loaded == TupleStatus.UNKNOWN) {
+            loaded = pauseAndLoadRightTuple();
         }
+        return loaded;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 453287d..671c082 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -18,14 +18,6 @@
  */
 package org.apache.asterix.runtime.operators.joins.intervalpartition;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.dataflow.value.IRangeMap;
@@ -109,27 +101,6 @@ public class IntervalPartitionUtil {
         return (k * k + k) / 2;
     }
 
-    public static void printJoinPartitionMap(ArrayList<HashSet<Integer>> partitionMap) {
-        for (int i = 0; i < partitionMap.size(); ++i) {
-            System.out.print("(hashset) Partition " + i + " must join with partition(s): ");
-            for (Integer map : partitionMap.get(i)) {
-                System.out.print(map + " ");
-            }
-            System.out.println("");
-        }
-    }
-
-    public static void printPartitionMap(int k) {
-        for (int i = 0; i < k; ++i) {
-            for (int j = i; j < k; ++j) {
-                int pid = intervalPartitionMap(i, j, k);
-                Pair<Integer, Integer> partition = getIntervalPartition(pid, k);
-                System.out.println("Map partition (" + i + ", " + j + ") to partition id: " + pid + " back to pair ("
-                        + partition.first + ", " + partition.second + ")");
-            }
-        }
-    }
-
     /**
      * Map the partition start and end points to a single value.
      * The mapped partitions are sorted in interval starting at 0.
@@ -192,47 +163,6 @@ public class IntervalPartitionUtil {
         return partitionEnd;
     }
 
-    public static LinkedHashSet<Integer> getProbeJoinPartitions(int pid, int[] buildPSizeInTups,
-            IIntervalMergeJoinChecker imjc, int k) {
-        LinkedHashSet<Integer> joinMap = new LinkedHashSet<>();
-        Pair<Integer, Integer> map = getIntervalPartition(pid, k);
-        int probeStart = map.first;
-        int probeEnd = map.second;
-        // Build partitions with data
-        for (int buildStart = 0; buildStart < k; ++buildStart) {
-            for (int buildEnd = k - 1; buildStart <= buildEnd; --buildEnd) {
-                int buildId = intervalPartitionMap(buildStart, buildEnd, k);
-                if (buildPSizeInTups[buildId] > 0) {
-                    // Join partitions for probe's pid
-                    if (!(buildStart == 0 && probeStart == 0)
-                            && imjc.compareIntervalPartition(buildStart, buildEnd, probeStart, probeEnd)) {
-                        joinMap.add(buildId);
-                    }
-                }
-            }
-        }
-        return joinMap;
-    }
-
-    public static LinkedHashMap<Integer, LinkedHashSet<Integer>> getInMemorySpillJoinMap(
-            LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap, BitSet buildInMemoryStatus,
-            BitSet probeSpilledStatus) {
-        LinkedHashMap<Integer, LinkedHashSet<Integer>> inMemoryMap = new LinkedHashMap<>();
-        for (Entry<Integer, LinkedHashSet<Integer>> entry : probeJoinMap.entrySet()) {
-            if (probeSpilledStatus.get(entry.getKey())) {
-                for (Integer i : entry.getValue()) {
-                    if (buildInMemoryStatus.get(i)) {
-                        if (!inMemoryMap.containsKey(entry.getKey())) {
-                            inMemoryMap.put(entry.getKey(), new LinkedHashSet<Integer>());
-                        }
-                        inMemoryMap.get(entry.getKey()).add(i);
-                    }
-                }
-            }
-        }
-        return inMemoryMap;
-    }
-
     public static long getPartitionDuration(long partitionStart, long partitionEnd, int k) throws HyracksDataException {
         if (k <= 2) {
             throw new HyracksDataException("k is to small for interval partitioner.");