You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Shiva Jahangiri (Code Review)" <de...@asterixdb.apache.org> on 2019/06/23 01:42:12 UTC

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Shiva Jahangiri has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3454


Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................

NoGrow-NoSteal algorithm for large object in hybrid hash join.

Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
---
M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
10 files changed, 985 insertions(+), 398 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/54/3454/1

diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index abaf146..0713725 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -99,5 +99,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
index 47b11ce..3bc4fdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
@@ -29,7 +29,7 @@
 public class DeallocatableFramePool implements IDeallocatableFramePool {
 
     private final IHyracksFrameMgrContext ctx;
-    private final int memBudget;
+    private int memBudget;
     private int allocated;
     private LinkedList<ByteBuffer> buffers;
 
@@ -125,4 +125,27 @@
         allocated = 0;
         buffers.clear();
     }
+
+    @Override
+    public void releaseUnusedMemory() {
+        for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext();) {
+            ByteBuffer next = iter.next();
+            allocated -= next.capacity();
+            ctx.deallocateFrames(next.capacity());
+            iter.remove();
+        }
+        buffers.clear();
+    }
+
+    @Override
+    public int getSizeOfAllocatedMemory() {
+        return allocated;
+    }
+
+    @Override
+    public void decreaseMemoryBudget(int budget) {
+        assert budget >= allocated : "The new memory budget cannot be lower than what has been already allocated.";
+        memBudget = budget;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
index 39426c1..54168ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
@@ -25,4 +25,10 @@
 
     void deAllocateBuffer(ByteBuffer buffer);
 
+    void releaseUnusedMemory();
+
+    int getSizeOfAllocatedMemory();
+
+    void decreaseMemoryBudget(int budget);
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 6c08be2..a529318 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -37,14 +37,16 @@
      * Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, int size) into
      * specified partition. The handle is written into the tuplepointer.
      * <br>
-     * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the {@code fieldEndOffsets} as NULL
+     * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the
+     * {@code fieldEndOffsets} as NULL
      *
      * @param partition
      *            the id of the partition to insert the tuple into
      * @param byteArray
      *            the byteArray which contains the tuple
      * @param fieldEndOffsets
-     *            the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL if the {@code byteArray} already contains the fieldEndOffsets
+     *            the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to
+     *            NULL if the {@code byteArray} already contains the fieldEndOffsets
      * @param start
      *            the start offset in the {@code byteArray}
      * @param size
@@ -67,7 +69,7 @@
      * @param tupleId
      *            the id of the tuple from the tupleAccessor
      * @param pointer
-     *            the returned pointer indicating the handler to later fetch the tuple from the buffer maanager
+     *            the returned pointer indicating the handler to later fetch the tuple from the buffer manager
      * @return true if the insertion succeed. Otherwise return false.
      * @throws HyracksDataException
      */
@@ -80,7 +82,8 @@
     void cancelInsertTuple(int partition) throws HyracksDataException;
 
     /**
-     * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
+     * Reset to the initial states. The previous allocated resources won't be released in order to be used in the
+     * next round.
      *
      * @throws HyracksDataException
      */
@@ -113,4 +116,15 @@
      */
     void clearPartition(int partition) throws HyracksDataException;
 
+    /**
+     * Releases the frames that were allocated but not used currently.
+     */
+    void releaseUnusedMemory();
+
+    /**
+     * Gives the size of the allocated frames.
+     * @return size of allocated memory.
+     */
+    int getSizeOfAllocatedMemory();
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..c515fb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -204,7 +204,7 @@
 
     private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
             throws HyracksDataException {
-        assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+        assert bufferInfo.getStartOffset() == 0 : "Haven't supported yet in FrameTupleAppender";
         if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
             appendFrame.reset(bufferInfo.getBuffer());
             appender.reset(appendFrame, false);
@@ -288,4 +288,14 @@
 
     }
 
+    @Override
+    public void releaseUnusedMemory() {
+        framePool.releaseUnusedMemory();
+    }
+
+    @Override
+    public int getSizeOfAllocatedMemory() {
+        return framePool.getSizeOfAllocatedMemory();
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
new file mode 100644
index 0000000..b384ada
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HashJoinPartitionsManager {
+
+    private final BitSet spilledStatus; //0=resident, 1=spilled
+    private final int numOfPartitions;
+    private int[] buildPSizeInTups;
+    private int[] probePSizeInTups;
+    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+    private final String buildRelName;
+    private final String probeRelName;
+    private final IHyracksTaskContext ctx;
+    private final int memSizeInFrames;
+    private IPartitionedTupleBufferManager bufferManager;
+    private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
+    private IFrame reloadBuffer;
+    // This is a reusable object to store the pointer.
+    private TuplePointer tempPtr = new TuplePointer();
+    // We mainly use it to match the corresponding function signature.
+    private final FrameTupleAccessor accessorBuild;
+    private final FrameTupleAccessor accessorProbe;
+    // Used for special probe BigObject which can not be held into the Join memory
+    private FrameTupleAppender bigFrameAppender;
+    private IDeallocatableFramePool framePool;
+
+    public HashJoinPartitionsManager(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
+            FrameTupleAccessor accessorBuild, FrameTupleAccessor accessorProbe, String buildRelName,
+            String probeRelName) {
+        this.numOfPartitions = numOfPartitions;
+        this.spilledStatus = new BitSet(numOfPartitions);
+        this.buildPSizeInTups = new int[numOfPartitions];
+        this.probePSizeInTups = new int[numOfPartitions];
+        this.buildRFWriters = new RunFileWriter[numOfPartitions];
+        this.probeRFWriters = new RunFileWriter[numOfPartitions];
+        this.buildRelName = buildRelName;
+        this.probeRelName = probeRelName;
+        this.ctx = ctx;
+        this.memSizeInFrames = memSizeInFrames;
+        this.accessorBuild = accessorBuild;
+        this.accessorProbe = accessorProbe;
+    }
+
+    /**
+     * Initializes the some data structures such as frame pool, buffer manager, and spill policy.
+     * @throws HyracksDataException
+     */
+    void init() throws HyracksDataException {
+        framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+        bufferManager = new VPartitionTupleBufferManager(
+                PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
+                numOfPartitions, framePool);
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
+    }
+
+    /**
+     * Increments the number of tuples for the partition with id = pid and the phase of whichSide.
+     * @param pid partition id.
+     * @param whichSide the phase(build or probe).
+     */
+    public void incrementTupleCount(int pid, OptimizedHybridHashJoin.SIDE whichSide) {
+        confirmValidPartitionAccess(pid);
+        switch (whichSide) {
+            case BUILD:
+                buildPSizeInTups[pid]++;
+                break;
+            case PROBE:
+                probePSizeInTups[pid]++;
+                break;
+        }
+    }
+
+    /**
+     * Shows if partition with id = pid is spilled or not.
+     * @param pid partition id
+     * @return true if partition is spilled.
+     */
+    boolean isPartitionSpilled(int pid) {
+        confirmValidPartitionAccess(pid);
+        return spilledStatus.get(pid);
+    }
+
+    /**
+     * Shows if all of the build partitions are in memory or not.
+     * @return true if all of build partitions are in memory.
+     */
+    boolean isBuildRelAllInMemory() {
+        return spilledStatus.nextSetBit(0) < 0;
+    }
+
+    /**
+     * Closes the build writer for partition with id  = pid.
+     * @param pid partition id
+     * @throws HyracksDataException
+     */
+    void closeBuildPartitionWriter(int pid) throws HyracksDataException {
+        confirmValidPartitionAccess(pid);
+        if (buildRFWriters[pid] == null) {
+            throw new HyracksDataException("Tried to close the non-existing file writer.");
+        }
+        buildRFWriters[pid].close();
+    }
+
+    /**
+     * In case of failure happens, we need to clear up the generated temporary files.
+     */
+    public void clearBuildTempFiles() throws HyracksDataException {
+        for (RunFileWriter buildRFWriter : buildRFWriters) {
+            if (buildRFWriter != null) {
+                buildRFWriter.erase();
+            }
+        }
+    }
+
+    /**
+     * In case of failure happens, we need to clear up the generated temporary files.
+     */
+    public void clearProbeTempFiles() throws HyracksDataException {
+        for (RunFileWriter probeRFWriter : probeRFWriters) {
+            if (probeRFWriter != null) {
+                probeRFWriter.erase();
+            }
+        }
+    }
+
+    /**
+     * Closes the all of the runfilewriters of the given side(phase).
+     * @param whichSide Build or Probe
+     * @throws HyracksDataException
+     */
+    private void closeAllSpilledPartitionWriters(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                break;
+            case PROBE:
+                runFileWriters = probeRFWriters;
+                break;
+        }
+        if (runFileWriters != null) {
+            for (RunFileWriter runFileWriter : runFileWriters) {
+                if (runFileWriter != null) {
+                    runFileWriter.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets the runfilewriter for the given partition and phase and if it is null, it creates one and
+     * assigns it to that partition and phase.
+     * @param pid partition id
+     * @param whichSide Build or Probe
+     * @return RunFileWriter for the given partition and given phase
+     * @throws HyracksDataException
+     */
+    RunFileWriter getRunFileWriterOrCreateNewOneIfNotExist(int pid, OptimizedHybridHashJoin.SIDE whichSide)
+            throws HyracksDataException {
+        confirmValidPartitionAccess(pid);
+        RunFileWriter[] runFileWriters = null;
+        String refName = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                refName = buildRelName;
+                break;
+            case PROBE:
+                runFileWriters = probeRFWriters;
+                refName = probeRelName;
+                break;
+        }
+        assert runFileWriters != null : "Runfilewriter for side " + whichSide.toString() + " is not initialized.";
+        RunFileWriter writer = runFileWriters[pid];
+        if (writer == null) {
+            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
+            writer = new RunFileWriter(file, ctx.getIoManager());
+            writer.open();
+            runFileWriters[pid] = writer;
+        }
+        return writer;
+    }
+
+    /**
+     * For every spilled partition of the given phase, this function flushes and deallocates its frames and closes
+     * their runfilewriters.
+     * @param whichSide Build or Probe
+     * @throws HyracksDataException
+     */
+    void flushAndCloseAllSpilledPartitions(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+
+        try {
+            for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+                    spilledStatus.nextSetBit(pid + 1)) {
+                if (bufferManager.getNumTuples(pid) > 0) {
+                    bufferManager.flushPartition(pid, getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide));
+                    bufferManager.clearPartition(pid);
+                }
+            }
+        } finally {
+            // Force to close all run file writers.
+            closeAllSpilledPartitionWriters(whichSide);
+        }
+    }
+
+    /**
+     * Tries to insert the given tuple to the given partition at build phase. If insertion fails due to
+     * lack of memory, then it finds a victim partition based on the given spill policy in order to make
+     * some room for inserting the record.
+     * @param tid Tuple id
+     * @param pid Partition id
+     * @throws HyracksDataException
+     */
+    public boolean processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
+        boolean flushed = false;
+        boolean isLargeRecord = accessorBuild.getTupleLength(tid) > ctx.getInitialFrameSize();
+        boolean processed = false;
+        while (!processed) {
+            if (spilledStatus.get(pid) && isLargeRecord) {
+                flushBigObjectToDisk(pid, OptimizedHybridHashJoin.SIDE.BUILD, accessorBuild, tid);
+                processed = true;
+            } else {// Either record is small or partition is in memory( or it never got any tuple)
+                processed = bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr);
+                if (!processed) {
+                    selectAndSpillVictim(pid);
+                    flushed = true;
+                }
+            }
+        }
+        incrementTupleCount(pid, OptimizedHybridHashJoin.SIDE.BUILD);
+        return flushed;
+    }
+
+    /**
+     * Tries to insert the given tuple to the given partition at probe phase. If tuple is larger than all of the memory
+     * given to the probe phase(= 1 frame for each spilled partition), gets flushed to the disk directly. Otherwise, we
+     * try to insert it to the given partition, if insertion gets failed, spilled partitions with maximum memory usage
+     * will get spilled to the disk until insertion proceeds.
+     * @param tid Tuple id
+     * @param pid Partition id
+     * @throws HyracksDataException
+     */
+    void processTupleProbePhase(int tid, int pid) throws HyracksDataException {
+        if (accessorProbe.getTupleLength(tid) > spilledStatus.cardinality() * ctx.getInitialFrameSize()) {
+            //Record is larger than all memory given to probe phase.
+            //TODO: There could be other policies, like flush every record that is larger than a frame.
+            flushBigObjectToDisk(pid, OptimizedHybridHashJoin.SIDE.PROBE, accessorProbe, tid);
+        } else {
+            while (!bufferManager.insertTuple(pid, accessorProbe, tid, tempPtr)) {
+                int victim = bufferManager.getNumTuples(pid) > 0 ? pid
+                        : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
+                if (victim < 0) {
+                    throw new HyracksDataException("Probe phase needs one frame reserved for each spilled partition.");
+                }
+                spillPartition(victim, OptimizedHybridHashJoin.SIDE.PROBE);
+            }
+        }
+    }
+
+    /**
+     * Selects a victim partition to spill based on spill policy, and then spills it to the disk.
+     * @param pid Partition id
+     * @throws HyracksDataException
+     */
+    private void selectAndSpillVictim(int pid) throws HyracksDataException {
+        int victimPartition = spillPolicy.selectVictimPartition(pid);
+        if (victimPartition < 0) {
+            throw new HyracksDataException(
+                    "No more space left in the memory buffer, please assign more memory to hash-join.");
+        }
+        spillPartition(victimPartition, OptimizedHybridHashJoin.SIDE.BUILD);
+    }
+
+    /**
+     * Flushes the given partition from the given phase to the disk and deallocates its frames.
+     * @param pid Partition id
+     * @param whichSide Build or Probe
+     * @throws HyracksDataException
+     */
+    void spillPartition(int pid, OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+        RunFileWriter writer = getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide);
+        bufferManager.flushPartition(pid, writer);
+        bufferManager.clearPartition(pid);
+        if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+            setSpilledStatusForPartition(pid);
+        }
+    }
+
+    /**
+     * Flushes given tuple directly to the disk.
+     * @param pid Partition id
+     * @param whichSide Build or Probe
+     * @param accessor Tuple accessor within a frame
+     * @param tid Tuple id
+     * @throws HyracksDataException
+     */
+    /*TODO: Currently the memory for storing this big object in order to flush is coming from hyracks context which
+    *  means that it is not considered in the memory budget and may lead to memory usage more than the budget.*/
+    private void flushBigObjectToDisk(int pid, OptimizedHybridHashJoin.SIDE whichSide, FrameTupleAccessor accessor,
+            int tid) throws HyracksDataException {
+        if (bigFrameAppender == null) {
+            bigFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+        }
+        RunFileWriter runFileWriter = getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide);
+        if (!bigFrameAppender.append(accessor, tid)) {
+            throw new HyracksDataException("The given tuple is too big");
+        }
+        bigFrameAppender.write(runFileWriter, true);
+    }
+
+    /**
+     * Bring the partition with id = pid to memory.
+     * @param pid partition id
+     * @return true if partition was successfully read in to the memory.
+     * @throws HyracksDataException
+     */
+    /*TODO: Currently the memory for the reload buffer is coming from hyracks context which
+     *  means that it is not considered in the memory budget and may lead to memory usage more than the budget.*/
+    public boolean loadSpilledPartitionToMem(int pid) throws HyracksDataException {
+        RunFileReader r = buildRFWriters[pid].createReader();
+        try {
+            r.open();
+            if (reloadBuffer == null) {
+                reloadBuffer = new VSizeFrame(ctx);
+            }
+            while (r.nextFrame(reloadBuffer)) {
+                accessorBuild.reset(reloadBuffer.getBuffer());
+                for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+                    if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                        continue;
+                    }
+                    // for some reason (e.g. due to fragmentation) if the inserting failed,
+                    // we need to clear the occupied frames
+                    bufferManager.clearPartition(pid);
+                    return false;
+                }
+            }
+            // Closes and deletes the run file if it is already loaded into memory.
+            r.setDeleteAfterClose(true);
+        } finally {
+            r.close();
+        }
+        unsetSpilledStatusForPartition(pid);
+        setRunFileWriter(pid, OptimizedHybridHashJoin.SIDE.BUILD, null);
+        return true;
+    }
+
+    /**
+     * Flushes all partitions to disk and releases their frames. Buffer manager gets closed as well.
+     * @throws HyracksDataException
+     */
+    public void releaseResources() throws HyracksDataException {
+        flushAndCloseAllSpilledPartitions(OptimizedHybridHashJoin.SIDE.PROBE);
+        bufferManager.close();
+        bufferManager = null;
+    }
+
+    /**
+     * Releases frames of the buffer manager that are not allocated(memBudget - allocated).
+     */
+    public void releaseUnusedMemory() {
+        bufferManager.releaseUnusedMemory();
+    }
+
+    /**
+     * Updates the memory budget of the frame pool to the amount of memory that is allocated.
+     */
+    public void updateMemoryBudget() {
+        int unused = getSizeOfUnusedMemory();
+        framePool.decreaseMemoryBudget(memSizeInFrames * ctx.getInitialFrameSize() - unused);
+    }
+
+    /**
+     * Makes sure that the given partition id is a valid id and in the range of number of partitions.
+     * @param pid partition id
+     */
+    private void confirmValidPartitionAccess(int pid) {
+        assert pid >= 0 && pid < numOfPartitions : "Accessing a partition out of range.";
+    }
+
+    /************************
+    *
+    * Getters And Setters
+    *
+     ***********************/
+
+    public IPartitionedTupleBufferManager getBufferManager() {
+        return bufferManager;
+    }
+
+    public void setBufferManager(IPartitionedTupleBufferManager bufferManager) {
+        this.bufferManager = bufferManager;
+    }
+
+    public int getSizeOfUnusedMemory() {
+        //Saves one frame for each spilled partition
+        return memSizeInFrames * ctx.getInitialFrameSize()
+                - (bufferManager.getSizeOfAllocatedMemory() + spilledStatus.cardinality() * ctx.getInitialFrameSize());
+    }
+
+    public RunFileWriter[] getBuildRFWriters() {
+        return buildRFWriters;
+    }
+
+    public void setRunFileWriter(int pid, OptimizedHybridHashJoin.SIDE whichSide, RunFileWriter rfw) {
+        confirmValidPartitionAccess(pid);
+        switch (whichSide) {
+            case BUILD:
+                buildRFWriters[pid] = rfw;
+                break;
+            case PROBE:
+                probeRFWriters[pid] = rfw;
+                break;
+        }
+
+    }
+
+    public BitSet getSpilledStatus() {
+        return spilledStatus;
+    }
+
+    private void setSpilledStatusForPartition(int pid) {
+        confirmValidPartitionAccess(pid);
+        spilledStatus.set(pid);
+    }
+
+    private void unsetSpilledStatusForPartition(int pid) {
+        confirmValidPartitionAccess(pid);
+        spilledStatus.set(pid, false);
+    }
+
+    public int getMaxPartitionSize(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+        switch (whichSide) {
+            case BUILD:
+                return Arrays.stream(buildPSizeInTups).max().getAsInt();
+            case PROBE:
+                return Arrays.stream(probePSizeInTups).max().getAsInt();
+            default:
+                throw new HyracksDataException("Unknown join phase. It has to be either build or probe.");
+        }
+    }
+
+    public int getTuplesCount(int pid, OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+        confirmValidPartitionAccess(pid);
+        switch (whichSide) {
+            case BUILD:
+                return buildPSizeInTups[pid];
+            case PROBE:
+                return probePSizeInTups[pid];
+            default:
+                throw new HyracksDataException("Unknown join phase. It has to be either build or probe.");
+        }
+    }
+
+    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+        return buildRFWriters[pid] == null ? null : buildRFWriters[pid].createDeleteOnCloseReader();
+    }
+
+    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+        return probeRFWriters[pid] == null ? null : (probeRFWriters[pid]).createDeleteOnCloseReader();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..5139b2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -21,9 +21,7 @@
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
-import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -32,30 +30,20 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
 import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 /**
  * This class mainly applies one level of HHJ on a pair of
  * relations. It is always called by the descriptor.
  */
 public class OptimizedHybridHashJoin {
-
-    // Used for special probe BigObject which can not be held into the Join memory
-    private FrameTupleAppender bigProbeFrameAppender;
 
     public enum SIDE {
         BUILD,
@@ -64,9 +52,6 @@
 
     private IHyracksTaskContext ctx;
 
-    private final String buildRelName;
-    private final String probeRelName;
-
     private final ITuplePairComparator comparator;
     private final ITuplePartitionComputer buildHpc;
     private final ITuplePartitionComputer probeHpc;
@@ -74,40 +59,30 @@
     private final RecordDescriptor buildRd;
     private final RecordDescriptor probeRd;
 
-    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
-    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
     private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final IMissingWriter[] nonMatchWriters;
 
-    private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
     private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
-
-    private IPartitionedTupleBufferManager bufferManager;
-    private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
 
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
 
     private IDeallocatableFramePool framePool;
     private ISimpleFrameBufferManager bufferManagerForHashTable;
+    private HashJoinPartitionsManager partitionsManager;
 
-    private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+    private boolean debug = false;
 
-    // stats information
-    private int[] buildPSizeInTups;
-    private IFrame reloadBuffer;
-    private TuplePointer tempPtr = new TuplePointer(); // this is a reusable object to store the pointer,which is not used anywhere.
-                                                       // we mainly use it to match the corresponding function signature.
-    private int[] probePSizeInTups;
+    //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+    private boolean isReversed;
 
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
-            String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
-            RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+    OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions, String probeRelName,
+            String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd, RecordDescriptor buildRd,
+            ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+            boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
@@ -115,12 +90,7 @@
         this.buildHpc = buildHpc;
         this.probeHpc = probeHpc;
         this.comparator = comparator;
-        this.buildRelName = buildRelName;
-        this.probeRelName = probeRelName;
-
         this.numOfPartitions = numOfPartitions;
-        this.buildRFWriters = new RunFileWriter[numOfPartitions];
-        this.probeRFWriters = new RunFileWriter[numOfPartitions];
 
         this.accessorBuild = new FrameTupleAccessor(buildRd);
         this.accessorProbe = new FrameTupleAccessor(probeRd);
@@ -129,7 +99,8 @@
         this.isLeftOuter = isLeftOuter;
         this.isReversed = false;
 
-        this.spilledStatus = new BitSet(numOfPartitions);
+        this.partitionsManager = new HashJoinPartitionsManager(ctx, memSizeInFrames, numOfPartitions, accessorBuild,
+                accessorProbe, buildRelName, probeRelName);
 
         this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
         if (isLeftOuter) {
@@ -139,134 +110,73 @@
         }
     }
 
-    public void initBuild() throws HyracksDataException {
-        framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
-        bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
-        bufferManager = new VPartitionTupleBufferManager(
-                PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
-                numOfPartitions, framePool);
-        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
-        spilledStatus.clear();
-        buildPSizeInTups = new int[numOfPartitions];
+    /**
+     * Initializes the partition manager.
+     * @throws HyracksDataException
+     */
+    void initBuild() throws HyracksDataException {
+        partitionsManager.init();
     }
 
+    /**
+     * Processes the tuples of the incoming buffer and inserts them in the selected partition.
+     * @param buffer
+     * @throws HyracksDataException
+     */
     public void build(ByteBuffer buffer) throws HyracksDataException {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
-
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
-            processTuple(i, pid);
-            buildPSizeInTups[pid]++;
-        }
-
-    }
-
-    private void processTuple(int tid, int pid) throws HyracksDataException {
-        while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            selectAndSpillVictim(pid);
+            partitionsManager.processTupleBuildPhase(i, pid);
         }
     }
 
-    private void selectAndSpillVictim(int pid) throws HyracksDataException {
-        int victimPartition = spillPolicy.selectVictimPartition(pid);
-        if (victimPartition < 0) {
-            throw new HyracksDataException(
-                    "No more space left in the memory buffer, please assign more memory to hash-join.");
-        }
-        spillPartition(victimPartition);
-    }
-
-    private void spillPartition(int pid) throws HyracksDataException {
-        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
-        bufferManager.flushPartition(pid, writer);
-        bufferManager.clearPartition(pid);
-        spilledStatus.set(pid);
-    }
-
-    private void closeBuildPartition(int pid) throws HyracksDataException {
-        if (buildRFWriters[pid] == null) {
-            throw new HyracksDataException("Tried to close the non-existing file writer.");
-        }
-        buildRFWriters[pid].close();
-    }
-
-    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        String refName = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                refName = buildRelName;
-                break;
-            case PROBE:
-                refName = probeRelName;
-                runFileWriters = probeRFWriters;
-                break;
-        }
-        RunFileWriter writer = runFileWriters[pid];
-        if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIoManager());
-            writer.open();
-            runFileWriters[pid] = writer;
-        }
-        return writer;
-    }
-
-    public void closeBuild() throws HyracksDataException {
+    /**
+     * It is called at the end of the buidl phase. It flushes all of the spilled partitions to disk, makes
+     * enough space for hash table and then releases any extra memory that is left from partition manager.
+     * In case any of the partitions is in memory, it uses the left over memory from partition manager to build
+     * the hash table and load data in it.
+     *
+     * @throws HyracksDataException
+     */
+    void closeBuild() throws HyracksDataException {
         // Flushes the remaining chunks of the all spilled partitions to the disk.
-        closeAllSpilledPartitions(SIDE.BUILD);
+        partitionsManager.flushAndCloseAllSpilledPartitions(SIDE.BUILD);
 
         // Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
         // during this step in order to make the space.)
         // and tries to bring back as many spilled partitions as possible if there is free space.
         int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
 
+        // At this point, all partitions are final(no reload or spill), as a such we can release the unused
+        // memory so they can be used in the buffer manager of hash table. Note that by loading data to
+        // in memory join, frames of those partitions will get copied to the in memory join as such they
+        // are not considered for being released.
+        partitionsManager.releaseUnusedMemory();
+
+        //Updates(reduces or no change) the memory budget of partition manager to the allocated memory to make sure
+        // we are not exceeding the budget as the sum of memory budget for partition manager and hash table should be
+        // equal to hash join memory budget.
+        partitionsManager.updateMemoryBudget();
+
+        initHashTable();
+
         createInMemoryJoiner(inMemTupCount);
 
         loadDataInMemJoin();
+
+        checkOneFrameReservedPerSpilledPartitions();
     }
 
     /**
-     * In case of failure happens, we need to clear up the generated temporary files.
+     * Initializes the frame pool and buffer manager for hash table based on left over memory
+     * from partition manager.
      */
-    public void clearBuildTempFiles() throws HyracksDataException {
-        for (int i = 0; i < buildRFWriters.length; i++) {
-            if (buildRFWriters[i] != null) {
-                buildRFWriters[i].erase();
-            }
-        }
-    }
-
-    private void closeAllSpilledPartitions(SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                break;
-            case PROBE:
-                runFileWriters = probeRFWriters;
-                break;
-        }
-        try {
-            for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
-                    spilledStatus.nextSetBit(pid + 1)) {
-                if (bufferManager.getNumTuples(pid) > 0) {
-                    bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
-                    bufferManager.clearPartition(pid);
-                }
-            }
-        } finally {
-            // Force to close all run file writers.
-            if (runFileWriters != null) {
-                for (RunFileWriter runFileWriter : runFileWriters) {
-                    if (runFileWriter != null) {
-                        runFileWriter.close();
-                    }
-                }
-            }
-        }
+    private void initHashTable() {
+        int size = partitionsManager.getSizeOfUnusedMemory();
+        framePool = new DeallocatableFramePool(ctx, size);
+        bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
     }
 
     /**
@@ -278,93 +188,119 @@
      * @throws HyracksDataException
      */
     private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
-        // we need number of |spilledPartitions| buffers to store the probe data
-        int frameSize = ctx.getInitialFrameSize();
-        long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
 
-        // For partitions in main memory, we deduct their size from the free space.
+        int frameSize = ctx.getInitialFrameSize();
+        BitSet spilledStatus = partitionsManager.getSpilledStatus();
+        long freeSpace = partitionsManager.getSizeOfUnusedMemory();
+
         int inMemTupCount = 0;
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
-            freeSpace -= bufferManager.getPhysicalSize(p);
-            inMemTupCount += buildPSizeInTups[p];
+            inMemTupCount += partitionsManager.getTuplesCount(p, SIDE.BUILD);
         }
 
-        // Calculates the expected hash table size for the given number of tuples in main memory
-        // and deducts it from the free space.
         long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
-        freeSpace -= hashTableByteSizeForInMemTuples;
 
-        // In the case where free space is less than zero after considering the hash table size,
-        // we need to spill more partitions until we can accommodate the hash table in memory.
-        // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
-        //                                        hash table, or keep spilling from the first partition)
-        boolean moreSpilled = false;
+        if (freeSpace - hashTableByteSizeForInMemTuples < 0) {
+            int initialInMemTupCount = inMemTupCount;
+            inMemTupCount = spillPartitionsToAccommodateHashTable(hashTableByteSizeForInMemTuples, inMemTupCount);
+            long hashTableSizeDecrease =
+                    -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(initialInMemTupCount,
+                            -(initialInMemTupCount - inMemTupCount), frameSize);
+            hashTableByteSizeForInMemTuples += hashTableSizeDecrease;
+        }
 
-        // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
-        if (freeSpace < 0) {
-            // Tries to find a best-fit partition not to spill many partitions.
-            int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
-            if (pidToSpill >= 0) {
-                // There is a suitable one. We spill that partition to the disk.
-                long hashTableSizeDecrease =
-                        -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
-                                -buildPSizeInTups[pidToSpill], frameSize);
-                freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
-                inMemTupCount -= buildPSizeInTups[pidToSpill];
-                spillPartition(pidToSpill);
-                closeBuildPartition(pidToSpill);
-                moreSpilled = true;
-            } else {
-                // There is no single suitable partition. So, we need to spill multiple partitions to the disk
-                // in order to accommodate the hash table.
-                for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
-                        spilledStatus.nextClearBit(p + 1)) {
-                    int spaceToBeReturned = bufferManager.getPhysicalSize(p);
-                    int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
-                    if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
-                        continue;
-                    }
-                    spillPartition(p);
-                    closeBuildPartition(p);
-                    moreSpilled = true;
+        inMemTupCount = bringPartitionsBack(hashTableByteSizeForInMemTuples, inMemTupCount);
+
+        return inMemTupCount;
+    }
+
+    /**
+     * Spills some partitions out in order to make some space for the hash table.
+     *
+     * @param hashTableByteSizeForInMemTuples size of the current hash table
+     * @param inMemTupCount number of the tuples that are in memory
+     * @return number of tuples in memory
+     * @throws HyracksDataException
+     */
+    private int spillPartitionsToAccommodateHashTable(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+            throws HyracksDataException {
+        int frameSize = ctx.getInitialFrameSize();
+        int currentMemoryCounts = inMemTupCount;
+        long currentHashTableSize = hashTableByteSizeForInMemTuples;
+        long freeSpace = calculateFreeSpaceForHashTable(currentHashTableSize);
+        int pidToSpill = selectSinglePartitionToSpill(freeSpace, currentMemoryCounts, frameSize);
+        if (pidToSpill >= 0) {
+            currentMemoryCounts -= partitionsManager.getTuplesCount(pidToSpill, SIDE.BUILD);
+            partitionsManager.spillPartition(pidToSpill, SIDE.BUILD);
+            partitionsManager.closeBuildPartitionWriter(pidToSpill);
+            if (debug) {
+                checkOneFrameReservedPerSpilledPartitions();
+            }
+        } else {
+            BitSet spilledStatus = partitionsManager.getSpilledStatus();
+            for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+                    spilledStatus.nextClearBit(p + 1)) {
+                int spaceToBeReturned = partitionsManager.getBufferManager().getPhysicalSize(p);
+                int numberOfTuplesToBeSpilled = partitionsManager.getTuplesCount(p, SIDE.BUILD);
+                if (spaceToBeReturned > 0 && numberOfTuplesToBeSpilled > 0) {
+                    partitionsManager.spillPartition(p, SIDE.BUILD);
+                    partitionsManager.closeBuildPartitionWriter(p);
                     // Since the number of tuples in memory has been decreased,
                     // the hash table size will be decreased, too.
                     // We put minus since the method returns a negative value to represent a newly reclaimed space.
                     long expectedHashTableSizeDecrease =
-                            -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+                            -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentMemoryCounts,
                                     -numberOfTuplesToBeSpilled, frameSize);
-                    freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
-                    // Adjusts the hash table size
-                    inMemTupCount -= numberOfTuplesToBeSpilled;
+                    currentHashTableSize += expectedHashTableSizeDecrease;
+                    freeSpace = calculateFreeSpaceForHashTable(currentHashTableSize);
+
+                    currentMemoryCounts -= numberOfTuplesToBeSpilled;
+                    if (debug) {
+                        checkOneFrameReservedPerSpilledPartitions();
+                    }
                     if (freeSpace >= 0) {
                         break;
                     }
                 }
             }
         }
+        return currentMemoryCounts;
+    }
 
-        // If more partitions have been spilled to the disk, calculate the expected hash table size again
-        // before bringing some partitions to main memory.
-        if (moreSpilled) {
-            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
-        }
-
-        // Brings back some partitions if there is enough free space.
-        int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
-            if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+    /**
+     * Brings back some partitions if there is free memory and partitions that fit in that space.
+     *
+     * @param hashTableByteSizeForInMemTuples Current hash table size. Used for recalculation of size of hash table.
+     * @param inMemTupCount Current number of tuples in memory. Used for recalculation of size of hash table.
+     * @return number of in memory tuples after bringing some (or none) partitions in memory.
+     * @throws HyracksDataException
+     */
+    private int bringPartitionsBack(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+            throws HyracksDataException {
+        int pid;
+        int currentMemoryTupleCount = inMemTupCount;
+        while ((pid = selectAPartitionToReload(hashTableByteSizeForInMemTuples, currentMemoryTupleCount)) >= 0) {
+            if (!partitionsManager.loadSpilledPartitionToMem(pid)) {
                 break;
             }
-            long expectedHashTableByteSizeIncrease = SerializableHashTable
-                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
-            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
-            inMemTupCount += buildPSizeInTups[pid];
-            // Adjusts the hash table size
-            hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
+            currentMemoryTupleCount += partitionsManager.getTuplesCount(pid, SIDE.BUILD);
         }
+        if (debug) {
+            checkOneFrameReservedPerSpilledPartitions();
+        }
+        return currentMemoryTupleCount;
+    }
 
-        return inMemTupCount;
+    /**
+     * Calculates how much free space is left based on the memory that is not used by partition manager and current
+     * hash table size.
+     *
+     * @param hashTableByteSizeForInMemTuples size of hash table
+     * @return size of free space left not used by partition manager or hash table. It can be negative.
+     */
+    private long calculateFreeSpaceForHashTable(long hashTableByteSizeForInMemTuples) {
+        return partitionsManager.getSizeOfUnusedMemory() - hashTableByteSizeForInMemTuples;
     }
 
     /**
@@ -372,19 +308,24 @@
      *
      * @return the partition id that will be spilled to the disk. Returns -1 if there is no single suitable partition.
      */
-    private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize) {
+    private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize)
+            throws HyracksDataException {
         long spaceAfterSpill;
+        IPartitionedTupleBufferManager bufferManager = partitionsManager.getBufferManager();
         long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
         int minSpaceAfterSpillPartID = -1;
-
+        BitSet spilledStatus = partitionsManager.getSpilledStatus();
         for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
                 spilledStatus.nextClearBit(p + 1)) {
-            if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
+            if (partitionsManager.getTuplesCount(p, SIDE.BUILD) == 0 || bufferManager.getPhysicalSize(p) == 0) {
                 continue;
             }
             // We put minus since the method returns a negative value to represent a newly reclaimed space.
-            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
-                    .calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
+            // Size of one frame gets deducted because in case this partition spills, we need to reserve one frame
+            // for it.
+            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) - ctx.getInitialFrameSize()
+                    + (-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+                            -partitionsManager.getTuplesCount(p, SIDE.BUILD), frameSize));
             if (spaceAfterSpill == 0) {
                 // Found the perfect one. Just returns this partition.
                 return p;
@@ -397,49 +338,38 @@
         return minSpaceAfterSpillPartID;
     }
 
-    private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
-                spilledStatus.nextSetBit(i + 1)) {
-            int spilledTupleCount = buildPSizeInTups[i];
-            // Expected hash table size increase after reloading this partition
-            long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
-                    inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
-            if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
-                return i;
+    /**
+     * Finds a partition that can fit in the left over memory.
+     * @param hashTableByteSizeForInMemTuples size of the current hash table
+     * @param inMemTupCount number of tuples currently in memory
+     * @return number of tuples in memory after reloading 1 or 0 partitions in memory.
+     * @throws HyracksDataException
+     */
+    private int selectAPartitionToReload(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+            throws HyracksDataException {
+        long freeSpace = calculateFreeSpaceForHashTable(hashTableByteSizeForInMemTuples);
+        if (freeSpace > 0) {
+            BitSet spilledStatus = partitionsManager.getSpilledStatus();
+            for (int i = spilledStatus.nextSetBit(0); i >= 0 && i < numOfPartitions; i =
+                    spilledStatus.nextSetBit(i + 1)) {
+                int spilledTupleCount = partitionsManager.getTuplesCount(i, SIDE.BUILD);
+                // Expected hash table size increase after reloading this partition
+                long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                        inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
+                if (freeSpace >= partitionsManager.getBuildRFWriters()[i].getFileSize()
+                        + expectedHashTableByteSizeIncrease) {
+                    return i;
+                }
             }
         }
         return -1;
     }
 
-    private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
-        RunFileReader r = wr.createReader();
-        try {
-            r.open();
-            if (reloadBuffer == null) {
-                reloadBuffer = new VSizeFrame(ctx);
-            }
-            while (r.nextFrame(reloadBuffer)) {
-                accessorBuild.reset(reloadBuffer.getBuffer());
-                for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
-                    if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                        continue;
-                    }
-                    // for some reason (e.g. due to fragmentation) if the inserting failed,
-                    // we need to clear the occupied frames
-                    bufferManager.clearPartition(pid);
-                    return false;
-                }
-            }
-            // Closes and deletes the run file if it is already loaded into memory.
-            r.setDeleteAfterClose(true);
-        } finally {
-            r.close();
-        }
-        spilledStatus.set(pid, false);
-        buildRFWriters[pid] = null;
-        return true;
-    }
-
+    /**
+     * Initializes the hash table and in memory join.
+     * @param inMemTupCount number of tuples that are currently in memory.
+     * @throws HyracksDataException
+     */
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
         ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
         this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
@@ -447,13 +377,19 @@
                 predEvaluator, isReversed, bufferManagerForHashTable);
     }
 
+    /**
+     * Loads data from in-memory partitions to in-memory joiner.
+     * @throws HyracksDataException
+     */
     private void loadDataInMemJoin() throws HyracksDataException {
 
         for (int pid = 0; pid < numOfPartitions; pid++) {
-            if (!spilledStatus.get(pid)) {
-                bufferManager.flushPartition(pid, new IFrameWriter() {
+            if (!partitionsManager.isPartitionSpilled(pid)) {
+                partitionsManager.getBufferManager().flushPartition(pid, new IFrameWriter() {
                     @Override
                     public void open() throws HyracksDataException {
+                        //Not implemented as this method is only used to pass the frames of the given partition
+                        //to the in memory joiner. As such, only next frame is important.
 
                     }
 
@@ -464,143 +400,88 @@
 
                     @Override
                     public void fail() throws HyracksDataException {
-
+                        //Not implemented as this method is only used to pass the frames of the given partition
+                        //to the in memory joiner. As such, only next frame is important.
                     }
 
                     @Override
                     public void close() throws HyracksDataException {
-
+                        //Not implemented as this method is only used to pass the frames of the given partition
+                        //to the in memory joiner. As such, only next frame is important.
                     }
                 });
             }
         }
     }
 
-    public void initProbe() throws HyracksDataException {
-
-        probePSizeInTups = new int[numOfPartitions];
-        probeRFWriters = new RunFileWriter[numOfPartitions];
-
-    }
-
+    /**
+     * Processes the tuples of incoming probe buffer. Multiple cases could happen:
+     * 1) All of the partitions are in memory : Just pass the buffer to in memory joiner.
+     * 2) There is at least one partition spilled :
+     * 2-1) It is not left outer join, or the corresponding partition from the build phase never
+     * received any record: There is no match for it, discard it.
+     * 2-2)There is a match for it: For each tuple check if it belongs to a spilled
+     * partition or an in-memory partition. If it belongs to a spilled partition add it to the output
+     * buffer of that parition, otherwise join it with in-memory tuples from build phase using in-memory joiner.
+     * @param buffer the incoming buffer from probe relation
+     * @param writer writer used for in memory joiner.
+     * @throws HyracksDataException
+     */
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessorProbe.reset(buffer);
-        int tupleCount = accessorProbe.getTupleCount();
 
-        if (isBuildRelAllInMemory()) {
+        if (partitionsManager.isBuildRelAllInMemory()) {
             inMemJoiner.join(buffer, writer);
             return;
         }
-        inMemJoiner.resetAccessorProbe(accessorProbe);
-        for (int i = 0; i < tupleCount; ++i) {
-            int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
 
-            if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
-                if (spilledStatus.get(pid)) { //pid is Spilled
-                    while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
-                        int victim = pid;
-                        if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one
-                            victim = spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
-                        }
-                        if (victim < 0) { // current tuple is too big for all the free space
-                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
-                            break;
-                        }
-                        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
-                        bufferManager.flushPartition(victim, runFileWriter);
-                        bufferManager.clearPartition(victim);
-                    }
+        accessorProbe.reset(buffer);
+        inMemJoiner.resetAccessorProbe(accessorProbe);
+
+        int tupleCount = accessorProbe.getTupleCount();
+        for (int tid = 0; tid < tupleCount; ++tid) {
+            int pid = probeHpc.partition(accessorProbe, tid, numOfPartitions);
+
+            if (partitionsManager.getTuplesCount(pid, SIDE.BUILD) > 0 || isLeftOuter) {
+                //Tuple has potential match from previous phase
+                if (partitionsManager.getSpilledStatus().get(pid)) { //pid is Spilled
+                    partitionsManager.processTupleProbePhase(tid, pid);
                 } else { //pid is Resident
-                    inMemJoiner.join(i, writer);
+                    inMemJoiner.join(tid, writer);
                 }
-                probePSizeInTups[pid]++;
+                partitionsManager.incrementTupleCount(pid, SIDE.PROBE);
             }
         }
     }
 
-    private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
-            throws HyracksDataException {
-        if (bigProbeFrameAppender == null) {
-            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
-        }
-        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
-        if (!bigProbeFrameAppender.append(accessorProbe, i)) {
-            throw new HyracksDataException("The given tuple is too big");
-        }
-        bigProbeFrameAppender.write(runFileWriter, true);
-    }
-
-    private boolean isBuildRelAllInMemory() {
-        return spilledStatus.nextSetBit(0) < 0;
-    }
-
-    public void completeProbe(IFrameWriter writer) throws HyracksDataException {
+    void completeProbe(IFrameWriter writer) throws HyracksDataException {
         //We do NOT join the spilled partitions here, that decision is made at the descriptor level
         //(which join technique to use)
         inMemJoiner.completeJoin(writer);
     }
 
-    public void releaseResource() throws HyracksDataException {
+    void releaseResource() throws HyracksDataException {
         inMemJoiner.closeTable();
-        closeAllSpilledPartitions(SIDE.PROBE);
-        bufferManager.close();
+        framePool.close();
         inMemJoiner = null;
-        bufferManager = null;
         bufferManagerForHashTable = null;
+        partitionsManager.releaseResources();
     }
 
-    /**
-     * In case of failure happens, we need to clear up the generated temporary files.
-     */
-    public void clearProbeTempFiles() throws HyracksDataException {
-        for (int i = 0; i < probeRFWriters.length; i++) {
-            if (probeRFWriters[i] != null) {
-                probeRFWriters[i].erase();
-            }
-        }
-    }
-
-    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
-        return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
-    }
-
-    public int getBuildPartitionSizeInTup(int pid) {
-        return (buildPSizeInTups[pid]);
-    }
-
-    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
-        return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
-    }
-
-    public int getProbePartitionSizeInTup(int pid) {
-        return (probePSizeInTups[pid]);
-    }
-
-    public int getMaxBuildPartitionSize() {
-        int max = buildPSizeInTups[0];
-        for (int i = 1; i < buildPSizeInTups.length; i++) {
-            if (buildPSizeInTups[i] > max) {
-                max = buildPSizeInTups[i];
-            }
-        }
-        return max;
-    }
-
-    public int getMaxProbePartitionSize() {
-        int max = probePSizeInTups[0];
-        for (int i = 1; i < probePSizeInTups.length; i++) {
-            if (probePSizeInTups[i] > max) {
-                max = probePSizeInTups[i];
-            }
-        }
-        return max;
-    }
-
-    public BitSet getPartitionStatus() {
-        return spilledStatus;
-    }
-
-    public void setIsReversed(boolean b) {
+    void setIsReversed(boolean b) {
         this.isReversed = b;
     }
+
+    HashJoinPartitionsManager getPartitionsManager() {
+        return partitionsManager;
+    }
+
+    private void checkOneFrameReservedPerSpilledPartitions() throws HyracksDataException {
+        //Make sure that there is one frame reserved for each spilled partition.
+        int frameSize = ctx.getInitialFrameSize();
+        if (memSizeInFrames * frameSize
+                - partitionsManager.getSizeOfUnusedMemory() < partitionsManager.getSpilledStatus().cardinality()
+                        * frameSize) {
+            throw new HyracksDataException("There should be at least one frame reserved for each spilled partition.");
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..7023a93 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -227,12 +227,14 @@
 
         @Override
         public void toBytes(DataOutput out) throws IOException {
-
+            //Not implemented as this class is only used to store the state of the of build phase
+            //so it can be used during the probe phase.
         }
 
         @Override
         public void fromBytes(DataInput in) throws IOException {
-
+            //Not implemented as this class is only used to store the state of the of build phase
+            //so it can be used during the probe phase.
         }
 
     }
@@ -263,7 +265,7 @@
             final ITuplePairComparator probComparator =
                     tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
             final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+                    predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
 
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -305,7 +307,7 @@
                     if (state.hybridHJ != null) {
                         state.hybridHJ.closeBuild();
                         if (isFailed) {
-                            state.hybridHJ.clearBuildTempFiles();
+                            state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
                         } else {
                             ctx.setStateObject(state);
                             if (LOGGER.isTraceEnabled()) {
@@ -369,6 +371,9 @@
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
+                /*TODO: Currently the memory for the buffer is coming from hyracks context
+                   which means that it is not considered in the memory budget and may lead
+                   to memory usage more than the budget.*/
                 private IFrame rPartbuff = new VSizeFrame(ctx);
 
                 private FrameTupleAppender nullResultAppender = null;
@@ -381,7 +386,6 @@
                             new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
                     writer.open();
-                    state.hybridHJ.initProbe();
 
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -404,8 +408,8 @@
                     if (failed) {
                         try {
                             // Clear temp files if fail() was called.
-                            state.hybridHJ.clearBuildTempFiles();
-                            state.hybridHJ.clearProbeTempFiles();
+                            state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
+                            state.hybridHJ.getPartitionsManager().clearProbeTempFiles();
                         } finally {
                             writer.close(); // writer should always be closed.
                         }
@@ -418,12 +422,12 @@
                         } finally {
                             state.hybridHJ.releaseResource();
                         }
-                        BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+                        BitSet partitionStatus = state.hybridHJ.getPartitionsManager().getSpilledStatus();
                         rPartbuff.reset();
                         for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
                                 partitionStatus.nextSetBit(pid + 1)) {
-                            RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
-                            RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+                            RunFileReader bReader = state.hybridHJ.getPartitionsManager().getBuildRFReader(pid);
+                            RunFileReader pReader = state.hybridHJ.getPartitionsManager().getProbeRFReader(pid);
 
                             if (bReader == null || pReader == null) {
                                 if (isLeftOuter && pReader != null) {
@@ -431,8 +435,10 @@
                                 }
                                 continue;
                             }
-                            int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
-                            int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+                            int bSize = state.hybridHJ.getPartitionsManager().getTuplesCount(pid,
+                                    OptimizedHybridHashJoin.SIDE.BUILD);
+                            int pSize = state.hybridHJ.getPartitionsManager().getTuplesCount(pid,
+                                    OptimizedHybridHashJoin.SIDE.PROBE);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
                     } catch (Exception e) {
@@ -440,8 +446,8 @@
                         // to send the failure signal to the downstream, when there is a throwable thrown.
                         writer.fail();
                         // Clear temp files as this.fail() nor this.close() will no longer be called after close().
-                        state.hybridHJ.clearBuildTempFiles();
-                        state.hybridHJ.clearProbeTempFiles();
+                        state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
+                        state.hybridHJ.getPartitionsManager().clearProbeTempFiles();
                         // Re-throw the whatever is caught.
                         throw e;
                     } finally {
@@ -587,7 +593,6 @@
                         probeSideReader.open();
                         rPartbuff.reset();
                         try {
-                            rHHj.initProbe();
                             while (probeSideReader.nextFrame(rPartbuff)) {
                                 rHHj.probe(rPartbuff.getBuffer(), writer);
                             }
@@ -601,11 +606,13 @@
                     }
 
                     try {
-                        int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
-                        int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+                        int maxAfterBuildSize =
+                                rHHj.getPartitionsManager().getMaxPartitionSize(OptimizedHybridHashJoin.SIDE.BUILD);
+                        int maxAfterProbeSize =
+                                rHHj.getPartitionsManager().getMaxPartitionSize(OptimizedHybridHashJoin.SIDE.PROBE);
                         int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
 
-                        BitSet rPStatus = rHHj.getPartitionStatus();
+                        BitSet rPStatus = rHHj.getPartitionsManager().getSpilledStatus();
                         if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
                             //Case 2.1.1 - Keep applying HHJ
                             if (LOGGER.isDebugEnabled()) {
@@ -613,10 +620,12 @@
                                         + "(isLeftOuter || build<probe) - [Level " + level + "]");
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-                                int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
-                                int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+                                RunFileReader rbrfw = rHHj.getPartitionsManager().getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getPartitionsManager().getProbeRFReader(rPid);
+                                int rbSizeInTuple = rHHj.getPartitionsManager().getTuplesCount(rPid,
+                                        OptimizedHybridHashJoin.SIDE.BUILD);
+                                int rpSizeInTuple = rHHj.getPartitionsManager().getTuplesCount(rPid,
+                                        OptimizedHybridHashJoin.SIDE.PROBE);
 
                                 if (rbrfw == null || rprfw == null) {
                                     if (isLeftOuter && rprfw != null) {
@@ -639,8 +648,8 @@
                                         + "(isLeftOuter || build<probe) - [Level " + level + "]");
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
-                                RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
-                                RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+                                RunFileReader rbrfw = rHHj.getPartitionsManager().getBuildRFReader(rPid);
+                                RunFileReader rprfw = rHHj.getPartitionsManager().getProbeRFReader(rPid);
 
                                 if (rbrfw == null || rprfw == null) {
                                     if (isLeftOuter && rprfw != null) {
@@ -650,8 +659,10 @@
                                     continue;
                                 }
 
-                                int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
-                                int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+                                int buildSideInTups = rHHj.getPartitionsManager().getTuplesCount(rPid,
+                                        OptimizedHybridHashJoin.SIDE.BUILD);
+                                int probeSideInTups = rHHj.getPartitionsManager().getTuplesCount(rPid,
+                                        OptimizedHybridHashJoin.SIDE.PROBE);
                                 // NLJ order is outer + inner, the order is reversed from the other joins
                                 if (isLeftOuter || probeSideInTups < buildSideInTups) {
                                     //checked-modified
@@ -665,14 +676,17 @@
                     } catch (Exception e) {
                         // Make sure that temporary run files generated in recursive hybrid hash joins
                         // are closed and deleted.
-                        rHHj.clearBuildTempFiles();
-                        rHHj.clearProbeTempFiles();
+                        rHHj.getPartitionsManager().clearBuildTempFiles();
+                        rHHj.getPartitionsManager().clearProbeTempFiles();
                         throw e;
                     }
                 }
 
                 private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
                     if (nullResultAppender == null) {
+                        /*TODO: Currently the memory for the buffer is coming from hyracks context
+                           which means that it is not considered in the memory budget and may lead
+                           to memory usage more than the budget.*/
                         nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
                     }
                     if (probeTupleAccessor == null) {
@@ -769,7 +783,9 @@
                             new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
                                     nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
                     nlj.setIsReversed(isReversed);
-
+                    /*TODO: Currently the memory for the buffer is coming from hyracks context
+                       which means that it is not considered in the memory budget and may lead
+                       to memory usage more than the budget.*/
                     IFrame cacheBuff = new VSizeFrame(ctx);
                     try {
                         innerReader.open();
@@ -785,6 +801,9 @@
                         }
                     }
                     try {
+                        /*TODO: Currently the memory for the buffer is coming from hyracks context
+                           which means that it is not considered in the memory budget and may lead
+                           to memory usage more than the budget.*/
                         IFrame joinBuff = new VSizeFrame(ctx);
                         outerReader.open();
                         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index e6da7c9..71e5fd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -48,15 +48,16 @@
             ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold) throws HyracksDataException {
         super(tableSize, ctx, false);
         this.bufferManager = bufferManager;
-
-        ByteBuffer newFrame = getFrame(frameSize);
-        if (newFrame == null) {
-            throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+        if (tableSize > 0) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+            }
+            IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+            frameCapacity = frame.capacity();
+            contents.add(frame);
+            currentOffsetInEachFrameList.add(0);
         }
-        IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
-        frameCapacity = frame.capacity();
-        contents.add(frame);
-        currentOffsetInEachFrameList.add(0);
         this.garbageCollectionThreshold = garbageCollectionThreshold;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
new file mode 100644
index 0000000..bbea993
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class HashJoinPartitionsManagerTest {
+    HashJoinPartitionsManager partitionsManager;
+    static IHyracksTaskContext ctx;
+    String buildRel = "BUILD";
+    String probeRel = "PROBE";
+    static FrameTupleAccessor accessorBuild;
+    static FrameTupleAccessor accessorProbe;
+    static IHyracksJobletContext jobletContext;
+    static IOManager ioManager;
+    static FileReference file;
+
+    @BeforeClass
+    public static void classSetUp() throws HyracksDataException {
+        file = Mockito.mock(FileReference.class);
+        ioManager = Mockito.mock(IOManager.class);
+        ctx = Mockito.mock(IHyracksTaskContext.class);
+        jobletContext = Mockito.mock(IHyracksJobletContext.class);
+        Mockito.when(jobletContext.createManagedWorkspaceFile(Mockito.anyString())).thenReturn(file);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletContext);
+        Mockito.when(ctx.getIoManager()).thenReturn(ioManager);
+        accessorBuild = Mockito.mock(FrameTupleAccessor.class);
+        accessorProbe = Mockito.mock(FrameTupleAccessor.class);
+        ctx = Mockito.mock(IHyracksTaskContext.class);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        partitionsManager = new HashJoinPartitionsManager(ctx, 10, 8, accessorBuild, accessorProbe, buildRel, probeRel);
+        partitionsManager.init();
+    }
+
+    @Test
+    public void incrementTupleCount_Build() throws HyracksDataException {
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+        for (int i = 0; i < 5; i++) {
+            partitionsManager.incrementTupleCount(1, OptimizedHybridHashJoin.SIDE.BUILD);
+        }
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 5;
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+    }
+
+    @Test
+    public void incrementTupleCount_Probe() throws HyracksDataException {
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+        for (int i = 0; i < 5; i++) {
+            partitionsManager.incrementTupleCount(1, OptimizedHybridHashJoin.SIDE.PROBE);
+        }
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 5;
+        assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+    }
+
+    @Test(expected = HyracksDataException.class)
+    public void closeBuildPartitionWriter() throws HyracksDataException {
+        partitionsManager.closeBuildPartitionWriter(1);
+    }
+
+    @Test
+    public void clearBuildTempFiles() throws HyracksDataException {
+        RunFileWriter rfw = Mockito.mock(RunFileWriter.class);
+        partitionsManager.setRunFileWriter(1, OptimizedHybridHashJoin.SIDE.BUILD, rfw);
+        partitionsManager.clearBuildTempFiles();
+        Mockito.verify(rfw).erase();
+    }
+
+    @Test
+    public void clearProbeTempFiles() throws HyracksDataException {
+        RunFileWriter rfw = Mockito.mock(RunFileWriter.class);
+        partitionsManager.setRunFileWriter(1, OptimizedHybridHashJoin.SIDE.PROBE, rfw);
+        partitionsManager.clearProbeTempFiles();
+        Mockito.verify(rfw).erase();
+    }
+
+    @Test(expected = HyracksDataException.class)
+    public void selectAndSpillVictim_NoSpaceLeftException() throws Throwable {
+        try {
+            Class cls = partitionsManager.getClass();
+            Method method = cls.getDeclaredMethod("selectAndSpillVictim", int.class);
+            method.setAccessible(true);
+            Field spillPolicy = cls.getDeclaredField("spillPolicy");
+            spillPolicy.setAccessible(true);
+            PreferToSpillFullyOccupiedFramePolicy spp =
+                    (PreferToSpillFullyOccupiedFramePolicy) spillPolicy.get(partitionsManager);
+            method.invoke(partitionsManager, 2);
+            Mockito.verify(spp).selectVictimPartition(2);
+        } catch (InvocationTargetException e) {
+            throw e.getTargetException();
+        }
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1: Contrib+1

BAD Compatibility Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterixbad-compat/4592/ : SUCCESS


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 02:00:23 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-cancellation-test/6014/ (2/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:16 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-no-installer-app/6151/ (15/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:34 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-ssl-compression/768/ (16/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:36 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-sonar/9955/ (7/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:19 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-storage/6573/ (8/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:20 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Anon. E. Moose (Code Review)" <de...@asterixdb.apache.org>.
Anon. E. Moose (1000171) has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Analytics Compatibility Compilation Successful
https://cbjenkins.page.link/wUJxRFW7ZxXHMBLh6 : SUCCESS


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:46:28 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app-openjdk11/1358/ (6/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:18 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-txnlog/1169/ (11/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:25 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/8767/


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:45:44 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

BAD Compatibility Tests Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-compat/4592/


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:45:42 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/5910/ (1/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:16 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app-sql-execution/5991/ (13/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:30 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-spidersilk-tests/928/ (14/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:32 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1: Integration-Tests+1

Integration Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/8767/ : SUCCESS


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 03:00:26 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-asterix-app/6362/ (12/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:27 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-ensure-ancestor/3996/ (9/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:22 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-source-format/5974/ (3/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-source-assemblies/6212/ (5/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-stabilization-f69489-compat/1262/ (10/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:23 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Anon. E. Moose (Code Review)" <de...@asterixdb.apache.org>.
Anon. E. Moose (1000171) has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1: Contrib-2

Analytics Compatibility Tests Failed
https://cbjenkins.page.link/AVzcjPAGtg1j35dc6 : UNSTABLE


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 03:11:56 +0000
Gerrit-HasComments: No

Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.

Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )

Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/11482/ (4/16)


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No