You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:33 UTC

[43/50] [abbrv] asterixdb git commit: new partition join algorithm

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
deleted file mode 100644
index a985eee..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionComputerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.RangeId;
-import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
-import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
-import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
-
-public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private static final int LEFT_ACTIVITY_ID = 0;
-    private static final int RIGHT_ACTIVITY_ID = 1;
-    private final int[] leftKeys;
-    private final int[] rightKeys;
-    private final int memoryForJoin;
-    private final IIntervalMergeJoinCheckerFactory imjcf;
-    private final RangeId rangeId;
-    private final int k;
-
-    private final int probeKey;
-    private final int buildKey;
-
-    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
-
-    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k,
-            int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
-            RangeId rangeId) {
-        super(spec, 2, 1);
-        recordDescriptors[0] = recordDescriptor;
-        this.buildKey = leftKeys[0];
-        this.probeKey = rightKeys[0];
-        this.k = k;
-        this.leftKeys = leftKeys;
-        this.rightKeys = rightKeys;
-        this.memoryForJoin = memoryForJoin;
-        this.imjcf = imjcf;
-        this.rangeId = rangeId;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        MergeJoinLocks locks = new MergeJoinLocks();
-
-        ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
-        ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
-
-        IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks);
-        IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks);
-
-        builder.addActivity(this, rightAN);
-        builder.addSourceEdge(1, rightAN, 0);
-
-        builder.addActivity(this, leftAN);
-        builder.addSourceEdge(0, leftAN, 0);
-        builder.addTargetEdge(0, leftAN, 0);
-    }
-
-    private class LeftJoinerActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final MergeJoinLocks locks;
-
-        public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
-            super(id);
-            this.locks = locks;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-                throws HyracksDataException {
-            locks.setPartitions(nPartitions);
-            final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            return new LeftJoinerOperator(ctx, partition, inRecordDesc);
-        }
-
-        private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-            private final IHyracksTaskContext ctx;
-            private final int partition;
-            private final RecordDescriptor leftRd;
-            private IntervalPartitionJoinTaskState state;
-            private boolean first = true;
-
-            public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
-                this.ctx = ctx;
-                this.partition = partition;
-                this.leftRd = inRecordDesc;
-            }
-
-            @Override
-            public void open() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    writer.open();
-                    state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition));;
-                    state.leftRd = leftRd;
-                    ctx.setStateObject(state);
-                    locks.getRight(partition).signal();
-
-                    do {
-                        // Continue after joiner created in right branch.
-                        if (state.partitionJoiner == null) {
-                            locks.getLeft(partition).await();
-                        }
-                    } while (state.partitionJoiner == null);
-                    state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
-                    locks.getRight(partition).signal();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                locks.getLock(partition).lock();
-                if (first) {
-                    state.status.branch[LEFT_ACTIVITY_ID].setStageData();
-                    first = false;
-                }
-                try {
-                    state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
-                    state.partitionJoiner.processLeftFrame(writer);
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    state.failed = true;
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    state.status.branch[LEFT_ACTIVITY_ID].noMore();
-                    if (state.failed) {
-                        writer.fail();
-                    } else {
-                        state.partitionJoiner.processLeftClose(writer);
-                        writer.close();
-                    }
-                    state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
-                    locks.getRight(partition).signal();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-        }
-    }
-
-    private class RightDataActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        private final ActivityId joinAid;
-        private final MergeJoinLocks locks;
-
-        public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
-            super(id);
-            this.joinAid = joinAid;
-            this.locks = locks;
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-                throws HyracksDataException {
-            locks.setPartitions(nPartitions);
-            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            return new RightDataOperator(ctx, partition, inRecordDesc);
-        }
-
-        private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable {
-
-            private int partition;
-            private IHyracksTaskContext ctx;
-            private final RecordDescriptor rightRd;
-            private IntervalPartitionJoinTaskState state;
-            private boolean first = true;
-
-            public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
-                this.ctx = ctx;
-                this.partition = partition;
-                this.rightRd = inRecordDesc;
-            }
-
-            @Override
-            public void open() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    do {
-                        // Wait for the state to be set in the context form Left.
-                        state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition));
-                        if (state == null) {
-                            locks.getRight(partition).await();
-                        }
-                    } while (state == null);
-                    state.k = k;
-
-                    RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
-                    long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
-                            partition);
-                    long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
-                    ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
-                            partitionStart, partitionEnd).createPartitioner();
-                    ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
-                            partitionStart, partitionEnd).createPartitioner();
-                    IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
-
-                    state.rightRd = rightRd;
-                    state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k,
-                            state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc);
-                    state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
-                    locks.getLeft(partition).signal();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                locks.getLock(partition).lock();
-                if (first) {
-                    state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
-                    first = false;
-                }
-                try {
-                    while (!state.status.continueRightLoad
-                            && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
-                        // Wait for the state to request right frame unless left has finished.
-                        locks.getRight(partition).await();
-                    }
-                    state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
-                    state.status.continueRightLoad = false;
-                    locks.getLeft(partition).signal();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    state.failed = true;
-                    locks.getLeft(partition).signal();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                locks.getLock(partition).lock();
-                try {
-                    state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
-                    locks.getLeft(partition).signal();
-                } finally {
-                    locks.getLock(partition).unlock();
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
deleted file mode 100644
index e8563c2..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState;
-
-public class IntervalPartitionJoinTaskState extends MergeJoinTaskState {
-    protected IntervalPartitionJoiner partitionJoiner;
-    public int k;
-
-    public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) {
-        super(jobId, taskId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
deleted file mode 100644
index fb2edd7..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
-import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
-import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
-import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
-import org.apache.hyracks.dataflow.std.join.MergeStatus;
-import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class IntervalPartitionJoiner extends AbstractMergeJoiner {
-
-    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
-
-    private final RunFileWriter probeRunFileWriter;
-    private int probeRunFilePid = -1;
-
-    private final ITuplePartitionComputer buildHpc;
-    private final ITuplePartitionComputer probeHpc;
-
-    private final int buildMemory;
-    private final int k;
-    private final int numOfPartitions;
-    private long buildSize = 0;
-    private long probeSize = 0;
-    private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
-
-    private final VPartitionTupleBufferManager buildBufferManager;
-    private final TuplePointer tempPtr = new TuplePointer();
-    private final List<Integer> buildInMemoryPartitions;
-    private final FrameTupleAccessor accessorBuild;
-    private BufferInfo bufferInfo;
-
-    private long spillWriteCount = 0;
-    private long spillReadCount = 0;
-    private long joinComparisonCount = 0;
-    private long joinResultCount = 0;
-    private final IIntervalMergeJoinChecker imjc;
-    private final FrameTupleAccessor accessorProbe;
-    private final IFrame reloadBuffer;
-    private boolean moreBuildProcessing = true;
-    private final List<IFrameBufferManager> fbms = new ArrayList<>();
-
-    public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status,
-            MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd,
-            ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException {
-        super(ctx, partition, status, locks, leftRd, rightRd);
-
-        bufferInfo = new BufferInfo(null, -1, -1);
-
-        this.accessorProbe = new FrameTupleAccessor(leftRd);
-        reloadBuffer = new VSizeFrame(ctx);
-
-        this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
-        this.imjc = imjc;
-
-        // TODO fix available memory size
-        this.buildMemory = memorySize;
-        buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
-                numOfPartitions, buildMemory * ctx.getInitialFrameSize());
-
-        this.k = k;
-        this.buildHpc = buildHpc;
-        this.probeHpc = probeHpc;
-
-        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
-        probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
-        probeRunFileWriter.open();
-
-        probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
-        buildInMemoryPartitions = new LinkedList<>();
-
-        this.accessorBuild = new FrameTupleAccessor(rightRd);
-
-        LOGGER.setLevel(Level.FINE);
-        System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
-                    + " frames of memory.");
-        }
-    }
-
-    @Override
-    public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
-        while (inputAccessor[LEFT_PARTITION].exists()) {
-            int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k);
-
-            if (probeRunFilePid != pid) {
-                // Log new partition locations.
-                RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(),
-                        inputAccessor[LEFT_PARTITION].getTupleId());
-                probeRunFilePointers.put(rfp, pid);
-                probeRunFilePid = pid;
-            }
-            inputAccessor[LEFT_PARTITION].next();
-            probeSize++;
-        }
-        inputBuffer[LEFT_PARTITION].rewind();
-        probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
-        spillWriteCount++;
-    }
-
-    @Override
-    public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
-        joinLoopOnMemory(writer);
-
-        // Flush result.
-        resultAppender.write(writer, true);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
-                    + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
-        }
-    }
-
-    private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
-        RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
-        pReader.open();
-        // Load first frame.
-        loadReaderNextFrame(pReader);
-
-        while (moreBuildProcessing) {
-            fillMemory();
-            joinMemoryBlockWithRunFile(writer, pReader);
-
-            // Clean up
-            for (int pid : buildInMemoryPartitions) {
-                buildBufferManager.clearPartition(pid);
-            }
-            buildInMemoryPartitions.clear();
-        }
-        pReader.close();
-    }
-
-    private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException {
-        // Join Disk partitions with Memory partitions
-        for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
-            Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
-                    k);
-            for (int buildId : buildInMemoryPartitions) {
-                Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
-                if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
-                    fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
-                }
-            }
-            if (!fbms.isEmpty()) {
-                join(pReader, probeId, fbms, writer);
-            }
-            fbms.clear();
-        }
-    }
-
-    private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms,
-            IFrameWriter writer) throws HyracksDataException {
-        long fileOffsetStart = rfpStart.getFileOffset();
-        int tupleStart = rfpStart.getTupleIndex();
-
-        RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
-        long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset();
-        int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex();
-
-        if (pReader.getReadPointer() != fileOffsetStart) {
-            pReader.reset(fileOffsetStart);
-            loadReaderNextFrame(pReader);
-        }
-        do {
-            int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0;
-            int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount();
-
-            for (int i = start; i < end; ++i) {
-                // Tuple has potential match from build phase
-                for (IFrameBufferManager fbm : buildFbms) {
-                    joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer);
-                }
-            }
-        } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader));
-    }
-
-    private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException {
-        if (pReader.nextFrame(reloadBuffer)) {
-            accessorProbe.reset(reloadBuffer.getBuffer());
-            spillReadCount++;
-            return true;
-        }
-        return false;
-    }
-
-    public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex,
-            IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException {
-        if (fbm.getNumFrames() == 0) {
-            return;
-        }
-        fbm.resetIterator();
-        int frameIndex = fbm.next();
-        while (fbm.exists()) {
-            fbm.getFrame(frameIndex, bufferInfo);
-            accessorBuild.reset(bufferInfo.getBuffer());
-            for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
-                if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
-                    appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
-                }
-                joinComparisonCount++;
-            }
-            frameIndex = fbm.next();
-        }
-    }
-
-    private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
-            int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
-        FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe,
-                probeSidetIx);
-        joinResultCount++;
-    }
-
-    private void fillMemory() throws HyracksDataException {
-        int buildPid = -1;
-        TupleStatus ts;
-        for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
-            int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(),
-                    k);
-            if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION],
-                    inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
-                return;
-            }
-
-            if (buildPid != pid) {
-                // Track new partitions in memory.
-                buildInMemoryPartitions.add(pid);
-                buildPid = pid;
-            }
-            inputAccessor[RIGHT_PARTITION].next();
-            buildSize++;
-        }
-        if (ts.isEmpty()) {
-            moreBuildProcessing = false;
-        }
-    }
-
-    private TupleStatus loadRightTuple() throws HyracksDataException {
-        TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
-        if (loaded == TupleStatus.UNKNOWN) {
-            loaded = pauseAndLoadRightTuple();
-        }
-        return loaded;
-    }
-
-}