You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Shiva Jahangiri (Code Review)" <de...@asterixdb.apache.org> on 2019/06/23 01:42:12 UTC
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Shiva Jahangiri has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3454
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
NoGrow-NoSteal algorithm for large object in hybrid hash join.
Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
---
M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
A hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
10 files changed, 985 insertions(+), 398 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/54/3454/1
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index abaf146..0713725 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -99,5 +99,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
index 47b11ce..3bc4fdc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
@@ -29,7 +29,7 @@
public class DeallocatableFramePool implements IDeallocatableFramePool {
private final IHyracksFrameMgrContext ctx;
- private final int memBudget;
+ private int memBudget;
private int allocated;
private LinkedList<ByteBuffer> buffers;
@@ -125,4 +125,27 @@
allocated = 0;
buffers.clear();
}
+
+ @Override
+ public void releaseUnusedMemory() {
+ for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext();) {
+ ByteBuffer next = iter.next();
+ allocated -= next.capacity();
+ ctx.deallocateFrames(next.capacity());
+ iter.remove();
+ }
+ buffers.clear();
+ }
+
+ @Override
+ public int getSizeOfAllocatedMemory() {
+ return allocated;
+ }
+
+ @Override
+ public void decreaseMemoryBudget(int budget) {
+ assert budget >= allocated : "The new memory budget cannot be lower than what has been already allocated.";
+ memBudget = budget;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
index 39426c1..54168ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IDeallocatableFramePool.java
@@ -25,4 +25,10 @@
void deAllocateBuffer(ByteBuffer buffer);
+ void releaseUnusedMemory();
+
+ int getSizeOfAllocatedMemory();
+
+ void decreaseMemoryBudget(int budget);
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index 6c08be2..a529318 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -37,14 +37,16 @@
* Insert tuple from (byte[] byteArray,int[] fieldEndOffsets, int start, int size) into
* specified partition. The handle is written into the tuplepointer.
* <br>
- * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the {@code fieldEndOffsets} as NULL
+ * If {@code byteArray} contains the {@code fieldEndOffsets} already, then please set the
+ * {@code fieldEndOffsets} as NULL
*
* @param partition
* the id of the partition to insert the tuple into
* @param byteArray
* the byteArray which contains the tuple
* @param fieldEndOffsets
- * the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to NULL if the {@code byteArray} already contains the fieldEndOffsets
+ * the fieldEndOffsets which comes from the ArrayTupleBuilder, please set it to
+ * NULL if the {@code byteArray} already contains the fieldEndOffsets
* @param start
* the start offset in the {@code byteArray}
* @param size
@@ -67,7 +69,7 @@
* @param tupleId
* the id of the tuple from the tupleAccessor
* @param pointer
- * the returned pointer indicating the handler to later fetch the tuple from the buffer maanager
+ * the returned pointer indicating the handler to later fetch the tuple from the buffer manager
* @return true if the insertion succeed. Otherwise return false.
* @throws HyracksDataException
*/
@@ -80,7 +82,8 @@
void cancelInsertTuple(int partition) throws HyracksDataException;
/**
- * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
+ * Reset to the initial states. The previous allocated resources won't be released in order to be used in the
+ * next round.
*
* @throws HyracksDataException
*/
@@ -113,4 +116,15 @@
*/
void clearPartition(int partition) throws HyracksDataException;
+ /**
+ * Releases the frames that were allocated but not used currently.
+ */
+ void releaseUnusedMemory();
+
+ /**
+ * Gives the size of the allocated frames.
+ * @return size of allocated memory.
+ */
+ int getSizeOfAllocatedMemory();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4578c2e..c515fb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -204,7 +204,7 @@
private int appendTupleToBuffer(BufferInfo bufferInfo, int[] fieldEndOffsets, byte[] byteArray, int start, int size)
throws HyracksDataException {
- assert (bufferInfo.getStartOffset() == 0) : "Haven't supported yet in FrameTupleAppender";
+ assert bufferInfo.getStartOffset() == 0 : "Haven't supported yet in FrameTupleAppender";
if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
appendFrame.reset(bufferInfo.getBuffer());
appender.reset(appendFrame, false);
@@ -288,4 +288,14 @@
}
+ @Override
+ public void releaseUnusedMemory() {
+ framePool.releaseUnusedMemory();
+ }
+
+ @Override
+ public int getSizeOfAllocatedMemory() {
+ return framePool.getSizeOfAllocatedMemory();
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
new file mode 100644
index 0000000..b384ada
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManager.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public class HashJoinPartitionsManager {
+
+ private final BitSet spilledStatus; //0=resident, 1=spilled
+ private final int numOfPartitions;
+ private int[] buildPSizeInTups;
+ private int[] probePSizeInTups;
+ private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+ private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+ private final String buildRelName;
+ private final String probeRelName;
+ private final IHyracksTaskContext ctx;
+ private final int memSizeInFrames;
+ private IPartitionedTupleBufferManager bufferManager;
+ private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
+ private IFrame reloadBuffer;
+ // This is a reusable object to store the pointer.
+ private TuplePointer tempPtr = new TuplePointer();
+ // We mainly use it to match the corresponding function signature.
+ private final FrameTupleAccessor accessorBuild;
+ private final FrameTupleAccessor accessorProbe;
+ // Used for special probe BigObject which can not be held into the Join memory
+ private FrameTupleAppender bigFrameAppender;
+ private IDeallocatableFramePool framePool;
+
+ public HashJoinPartitionsManager(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
+ FrameTupleAccessor accessorBuild, FrameTupleAccessor accessorProbe, String buildRelName,
+ String probeRelName) {
+ this.numOfPartitions = numOfPartitions;
+ this.spilledStatus = new BitSet(numOfPartitions);
+ this.buildPSizeInTups = new int[numOfPartitions];
+ this.probePSizeInTups = new int[numOfPartitions];
+ this.buildRFWriters = new RunFileWriter[numOfPartitions];
+ this.probeRFWriters = new RunFileWriter[numOfPartitions];
+ this.buildRelName = buildRelName;
+ this.probeRelName = probeRelName;
+ this.ctx = ctx;
+ this.memSizeInFrames = memSizeInFrames;
+ this.accessorBuild = accessorBuild;
+ this.accessorProbe = accessorProbe;
+ }
+
+ /**
+ * Initializes the some data structures such as frame pool, buffer manager, and spill policy.
+ * @throws HyracksDataException
+ */
+ void init() throws HyracksDataException {
+ framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+ bufferManager = new VPartitionTupleBufferManager(
+ PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
+ numOfPartitions, framePool);
+ spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
+ }
+
+ /**
+ * Increments the number of tuples for the partition with id = pid and the phase of whichSide.
+ * @param pid partition id.
+ * @param whichSide the phase(build or probe).
+ */
+ public void incrementTupleCount(int pid, OptimizedHybridHashJoin.SIDE whichSide) {
+ confirmValidPartitionAccess(pid);
+ switch (whichSide) {
+ case BUILD:
+ buildPSizeInTups[pid]++;
+ break;
+ case PROBE:
+ probePSizeInTups[pid]++;
+ break;
+ }
+ }
+
+ /**
+ * Shows if partition with id = pid is spilled or not.
+ * @param pid partition id
+ * @return true if partition is spilled.
+ */
+ boolean isPartitionSpilled(int pid) {
+ confirmValidPartitionAccess(pid);
+ return spilledStatus.get(pid);
+ }
+
+ /**
+ * Shows if all of the build partitions are in memory or not.
+ * @return true if all of build partitions are in memory.
+ */
+ boolean isBuildRelAllInMemory() {
+ return spilledStatus.nextSetBit(0) < 0;
+ }
+
+ /**
+ * Closes the build writer for partition with id = pid.
+ * @param pid partition id
+ * @throws HyracksDataException
+ */
+ void closeBuildPartitionWriter(int pid) throws HyracksDataException {
+ confirmValidPartitionAccess(pid);
+ if (buildRFWriters[pid] == null) {
+ throw new HyracksDataException("Tried to close the non-existing file writer.");
+ }
+ buildRFWriters[pid].close();
+ }
+
+ /**
+ * In case of failure happens, we need to clear up the generated temporary files.
+ */
+ public void clearBuildTempFiles() throws HyracksDataException {
+ for (RunFileWriter buildRFWriter : buildRFWriters) {
+ if (buildRFWriter != null) {
+ buildRFWriter.erase();
+ }
+ }
+ }
+
+ /**
+ * In case of failure happens, we need to clear up the generated temporary files.
+ */
+ public void clearProbeTempFiles() throws HyracksDataException {
+ for (RunFileWriter probeRFWriter : probeRFWriters) {
+ if (probeRFWriter != null) {
+ probeRFWriter.erase();
+ }
+ }
+ }
+
+ /**
+ * Closes the all of the runfilewriters of the given side(phase).
+ * @param whichSide Build or Probe
+ * @throws HyracksDataException
+ */
+ private void closeAllSpilledPartitionWriters(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+ RunFileWriter[] runFileWriters = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriters = buildRFWriters;
+ break;
+ case PROBE:
+ runFileWriters = probeRFWriters;
+ break;
+ }
+ if (runFileWriters != null) {
+ for (RunFileWriter runFileWriter : runFileWriters) {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets the runfilewriter for the given partition and phase and if it is null, it creates one and
+ * assigns it to that partition and phase.
+ * @param pid partition id
+ * @param whichSide Build or Probe
+ * @return RunFileWriter for the given partition and given phase
+ * @throws HyracksDataException
+ */
+ RunFileWriter getRunFileWriterOrCreateNewOneIfNotExist(int pid, OptimizedHybridHashJoin.SIDE whichSide)
+ throws HyracksDataException {
+ confirmValidPartitionAccess(pid);
+ RunFileWriter[] runFileWriters = null;
+ String refName = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriters = buildRFWriters;
+ refName = buildRelName;
+ break;
+ case PROBE:
+ runFileWriters = probeRFWriters;
+ refName = probeRelName;
+ break;
+ }
+ assert runFileWriters != null : "Runfilewriter for side " + whichSide.toString() + " is not initialized.";
+ RunFileWriter writer = runFileWriters[pid];
+ if (writer == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
+ writer = new RunFileWriter(file, ctx.getIoManager());
+ writer.open();
+ runFileWriters[pid] = writer;
+ }
+ return writer;
+ }
+
+ /**
+ * For every spilled partition of the given phase, this function flushes and deallocates its frames and closes
+ * their runfilewriters.
+ * @param whichSide Build or Probe
+ * @throws HyracksDataException
+ */
+ void flushAndCloseAllSpilledPartitions(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+
+ try {
+ for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+ spilledStatus.nextSetBit(pid + 1)) {
+ if (bufferManager.getNumTuples(pid) > 0) {
+ bufferManager.flushPartition(pid, getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide));
+ bufferManager.clearPartition(pid);
+ }
+ }
+ } finally {
+ // Force to close all run file writers.
+ closeAllSpilledPartitionWriters(whichSide);
+ }
+ }
+
+ /**
+ * Tries to insert the given tuple to the given partition at build phase. If insertion fails due to
+ * lack of memory, then it finds a victim partition based on the given spill policy in order to make
+ * some room for inserting the record.
+ * @param tid Tuple id
+ * @param pid Partition id
+ * @throws HyracksDataException
+ */
+ public boolean processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
+ boolean flushed = false;
+ boolean isLargeRecord = accessorBuild.getTupleLength(tid) > ctx.getInitialFrameSize();
+ boolean processed = false;
+ while (!processed) {
+ if (spilledStatus.get(pid) && isLargeRecord) {
+ flushBigObjectToDisk(pid, OptimizedHybridHashJoin.SIDE.BUILD, accessorBuild, tid);
+ processed = true;
+ } else {// Either record is small or partition is in memory( or it never got any tuple)
+ processed = bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr);
+ if (!processed) {
+ selectAndSpillVictim(pid);
+ flushed = true;
+ }
+ }
+ }
+ incrementTupleCount(pid, OptimizedHybridHashJoin.SIDE.BUILD);
+ return flushed;
+ }
+
+ /**
+ * Tries to insert the given tuple to the given partition at probe phase. If tuple is larger than all of the memory
+ * given to the probe phase(= 1 frame for each spilled partition), gets flushed to the disk directly. Otherwise, we
+ * try to insert it to the given partition, if insertion gets failed, spilled partitions with maximum memory usage
+ * will get spilled to the disk until insertion proceeds.
+ * @param tid Tuple id
+ * @param pid Partition id
+ * @throws HyracksDataException
+ */
+ void processTupleProbePhase(int tid, int pid) throws HyracksDataException {
+ if (accessorProbe.getTupleLength(tid) > spilledStatus.cardinality() * ctx.getInitialFrameSize()) {
+ //Record is larger than all memory given to probe phase.
+ //TODO: There could be other policies, like flush every record that is larger than a frame.
+ flushBigObjectToDisk(pid, OptimizedHybridHashJoin.SIDE.PROBE, accessorProbe, tid);
+ } else {
+ while (!bufferManager.insertTuple(pid, accessorProbe, tid, tempPtr)) {
+ int victim = bufferManager.getNumTuples(pid) > 0 ? pid
+ : spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
+ if (victim < 0) {
+ throw new HyracksDataException("Probe phase needs one frame reserved for each spilled partition.");
+ }
+ spillPartition(victim, OptimizedHybridHashJoin.SIDE.PROBE);
+ }
+ }
+ }
+
+ /**
+ * Selects a victim partition to spill based on spill policy, and then spills it to the disk.
+ * @param pid Partition id
+ * @throws HyracksDataException
+ */
+ private void selectAndSpillVictim(int pid) throws HyracksDataException {
+ int victimPartition = spillPolicy.selectVictimPartition(pid);
+ if (victimPartition < 0) {
+ throw new HyracksDataException(
+ "No more space left in the memory buffer, please assign more memory to hash-join.");
+ }
+ spillPartition(victimPartition, OptimizedHybridHashJoin.SIDE.BUILD);
+ }
+
+ /**
+ * Flushes the given partition from the given phase to the disk and deallocates its frames.
+ * @param pid Partition id
+ * @param whichSide Build or Probe
+ * @throws HyracksDataException
+ */
+ void spillPartition(int pid, OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+ RunFileWriter writer = getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide);
+ bufferManager.flushPartition(pid, writer);
+ bufferManager.clearPartition(pid);
+ if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ setSpilledStatusForPartition(pid);
+ }
+ }
+
+ /**
+ * Flushes given tuple directly to the disk.
+ * @param pid Partition id
+ * @param whichSide Build or Probe
+ * @param accessor Tuple accessor within a frame
+ * @param tid Tuple id
+ * @throws HyracksDataException
+ */
+ /*TODO: Currently the memory for storing this big object in order to flush is coming from hyracks context which
+ * means that it is not considered in the memory budget and may lead to memory usage more than the budget.*/
+ private void flushBigObjectToDisk(int pid, OptimizedHybridHashJoin.SIDE whichSide, FrameTupleAccessor accessor,
+ int tid) throws HyracksDataException {
+ if (bigFrameAppender == null) {
+ bigFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ }
+ RunFileWriter runFileWriter = getRunFileWriterOrCreateNewOneIfNotExist(pid, whichSide);
+ if (!bigFrameAppender.append(accessor, tid)) {
+ throw new HyracksDataException("The given tuple is too big");
+ }
+ bigFrameAppender.write(runFileWriter, true);
+ }
+
+ /**
+ * Bring the partition with id = pid to memory.
+ * @param pid partition id
+ * @return true if partition was successfully read in to the memory.
+ * @throws HyracksDataException
+ */
+ /*TODO: Currently the memory for the reload buffer is coming from hyracks context which
+ * means that it is not considered in the memory budget and may lead to memory usage more than the budget.*/
+ public boolean loadSpilledPartitionToMem(int pid) throws HyracksDataException {
+ RunFileReader r = buildRFWriters[pid].createReader();
+ try {
+ r.open();
+ if (reloadBuffer == null) {
+ reloadBuffer = new VSizeFrame(ctx);
+ }
+ while (r.nextFrame(reloadBuffer)) {
+ accessorBuild.reset(reloadBuffer.getBuffer());
+ for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+ if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ continue;
+ }
+ // for some reason (e.g. due to fragmentation) if the inserting failed,
+ // we need to clear the occupied frames
+ bufferManager.clearPartition(pid);
+ return false;
+ }
+ }
+ // Closes and deletes the run file if it is already loaded into memory.
+ r.setDeleteAfterClose(true);
+ } finally {
+ r.close();
+ }
+ unsetSpilledStatusForPartition(pid);
+ setRunFileWriter(pid, OptimizedHybridHashJoin.SIDE.BUILD, null);
+ return true;
+ }
+
+ /**
+ * Flushes all partitions to disk and releases their frames. Buffer manager gets closed as well.
+ * @throws HyracksDataException
+ */
+ public void releaseResources() throws HyracksDataException {
+ flushAndCloseAllSpilledPartitions(OptimizedHybridHashJoin.SIDE.PROBE);
+ bufferManager.close();
+ bufferManager = null;
+ }
+
+ /**
+ * Releases frames of the buffer manager that are not allocated(memBudget - allocated).
+ */
+ public void releaseUnusedMemory() {
+ bufferManager.releaseUnusedMemory();
+ }
+
+ /**
+ * Updates the memory budget of the frame pool to the amount of memory that is allocated.
+ */
+ public void updateMemoryBudget() {
+ int unused = getSizeOfUnusedMemory();
+ framePool.decreaseMemoryBudget(memSizeInFrames * ctx.getInitialFrameSize() - unused);
+ }
+
+ /**
+ * Makes sure that the given partition id is a valid id and in the range of number of partitions.
+ * @param pid partition id
+ */
+ private void confirmValidPartitionAccess(int pid) {
+ assert pid >= 0 && pid < numOfPartitions : "Accessing a partition out of range.";
+ }
+
+ /************************
+ *
+ * Getters And Setters
+ *
+ ***********************/
+
+ public IPartitionedTupleBufferManager getBufferManager() {
+ return bufferManager;
+ }
+
+ public void setBufferManager(IPartitionedTupleBufferManager bufferManager) {
+ this.bufferManager = bufferManager;
+ }
+
+ public int getSizeOfUnusedMemory() {
+ //Saves one frame for each spilled partition
+ return memSizeInFrames * ctx.getInitialFrameSize()
+ - (bufferManager.getSizeOfAllocatedMemory() + spilledStatus.cardinality() * ctx.getInitialFrameSize());
+ }
+
+ public RunFileWriter[] getBuildRFWriters() {
+ return buildRFWriters;
+ }
+
+ public void setRunFileWriter(int pid, OptimizedHybridHashJoin.SIDE whichSide, RunFileWriter rfw) {
+ confirmValidPartitionAccess(pid);
+ switch (whichSide) {
+ case BUILD:
+ buildRFWriters[pid] = rfw;
+ break;
+ case PROBE:
+ probeRFWriters[pid] = rfw;
+ break;
+ }
+
+ }
+
+ public BitSet getSpilledStatus() {
+ return spilledStatus;
+ }
+
+ private void setSpilledStatusForPartition(int pid) {
+ confirmValidPartitionAccess(pid);
+ spilledStatus.set(pid);
+ }
+
+ private void unsetSpilledStatusForPartition(int pid) {
+ confirmValidPartitionAccess(pid);
+ spilledStatus.set(pid, false);
+ }
+
+ public int getMaxPartitionSize(OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+ switch (whichSide) {
+ case BUILD:
+ return Arrays.stream(buildPSizeInTups).max().getAsInt();
+ case PROBE:
+ return Arrays.stream(probePSizeInTups).max().getAsInt();
+ default:
+ throw new HyracksDataException("Unknown join phase. It has to be either build or probe.");
+ }
+ }
+
+ public int getTuplesCount(int pid, OptimizedHybridHashJoin.SIDE whichSide) throws HyracksDataException {
+ confirmValidPartitionAccess(pid);
+ switch (whichSide) {
+ case BUILD:
+ return buildPSizeInTups[pid];
+ case PROBE:
+ return probePSizeInTups[pid];
+ default:
+ throw new HyracksDataException("Unknown join phase. It has to be either build or probe.");
+ }
+ }
+
+ public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+ return buildRFWriters[pid] == null ? null : buildRFWriters[pid].createDeleteOnCloseReader();
+ }
+
+ public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+ return probeRFWriters[pid] == null ? null : (probeRFWriters[pid]).createDeleteOnCloseReader();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c78e0dc..5139b2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -21,9 +21,7 @@
import java.nio.ByteBuffer;
import java.util.BitSet;
-import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -32,30 +30,20 @@
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
* This class mainly applies one level of HHJ on a pair of
* relations. It is always called by the descriptor.
*/
public class OptimizedHybridHashJoin {
-
- // Used for special probe BigObject which can not be held into the Join memory
- private FrameTupleAppender bigProbeFrameAppender;
public enum SIDE {
BUILD,
@@ -64,9 +52,6 @@
private IHyracksTaskContext ctx;
- private final String buildRelName;
- private final String probeRelName;
-
private final ITuplePairComparator comparator;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
@@ -74,40 +59,30 @@
private final RecordDescriptor buildRd;
private final RecordDescriptor probeRd;
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
private final IPredicateEvaluator predEvaluator;
private final boolean isLeftOuter;
private final IMissingWriter[] nonMatchWriters;
- private final BitSet spilledStatus; //0=resident, 1=spilled
private final int numOfPartitions;
private final int memSizeInFrames;
private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
-
- private IPartitionedTupleBufferManager bufferManager;
- private PreferToSpillFullyOccupiedFramePolicy spillPolicy;
private final FrameTupleAccessor accessorBuild;
private final FrameTupleAccessor accessorProbe;
private IDeallocatableFramePool framePool;
private ISimpleFrameBufferManager bufferManagerForHashTable;
+ private HashJoinPartitionsManager partitionsManager;
- private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+ private boolean debug = false;
- // stats information
- private int[] buildPSizeInTups;
- private IFrame reloadBuffer;
- private TuplePointer tempPtr = new TuplePointer(); // this is a reusable object to store the pointer,which is not used anywhere.
- // we mainly use it to match the corresponding function signature.
- private int[] probePSizeInTups;
+ //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
+ private boolean isReversed;
- public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
- String probeRelName, String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd,
- RecordDescriptor buildRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
- IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+ OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions, String probeRelName,
+ String buildRelName, ITuplePairComparator comparator, RecordDescriptor probeRd, RecordDescriptor buildRd,
+ ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
+ boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
this.ctx = ctx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
@@ -115,12 +90,7 @@
this.buildHpc = buildHpc;
this.probeHpc = probeHpc;
this.comparator = comparator;
- this.buildRelName = buildRelName;
- this.probeRelName = probeRelName;
-
this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
@@ -129,7 +99,8 @@
this.isLeftOuter = isLeftOuter;
this.isReversed = false;
- this.spilledStatus = new BitSet(numOfPartitions);
+ this.partitionsManager = new HashJoinPartitionsManager(ctx, memSizeInFrames, numOfPartitions, accessorBuild,
+ accessorProbe, buildRelName, probeRelName);
this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -139,134 +110,73 @@
}
}
- public void initBuild() throws HyracksDataException {
- framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
- bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
- bufferManager = new VPartitionTupleBufferManager(
- PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
- numOfPartitions, framePool);
- spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
- spilledStatus.clear();
- buildPSizeInTups = new int[numOfPartitions];
+ /**
+ * Initializes the partition manager.
+ * @throws HyracksDataException
+ */
+ void initBuild() throws HyracksDataException {
+ partitionsManager.init();
}
+ /**
+ * Processes the tuples of the incoming buffer and inserts them in the selected partition.
+ * @param buffer
+ * @throws HyracksDataException
+ */
public void build(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
-
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
- processTuple(i, pid);
- buildPSizeInTups[pid]++;
- }
-
- }
-
- private void processTuple(int tid, int pid) throws HyracksDataException {
- while (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- selectAndSpillVictim(pid);
+ partitionsManager.processTupleBuildPhase(i, pid);
}
}
- private void selectAndSpillVictim(int pid) throws HyracksDataException {
- int victimPartition = spillPolicy.selectVictimPartition(pid);
- if (victimPartition < 0) {
- throw new HyracksDataException(
- "No more space left in the memory buffer, please assign more memory to hash-join.");
- }
- spillPartition(victimPartition);
- }
-
- private void spillPartition(int pid) throws HyracksDataException {
- RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
- bufferManager.flushPartition(pid, writer);
- bufferManager.clearPartition(pid);
- spilledStatus.set(pid);
- }
-
- private void closeBuildPartition(int pid) throws HyracksDataException {
- if (buildRFWriters[pid] == null) {
- throw new HyracksDataException("Tried to close the non-existing file writer.");
- }
- buildRFWriters[pid].close();
- }
-
- private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- String refName = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- refName = buildRelName;
- break;
- case PROBE:
- refName = probeRelName;
- runFileWriters = probeRFWriters;
- break;
- }
- RunFileWriter writer = runFileWriters[pid];
- if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIoManager());
- writer.open();
- runFileWriters[pid] = writer;
- }
- return writer;
- }
-
- public void closeBuild() throws HyracksDataException {
+ /**
+ * It is called at the end of the buidl phase. It flushes all of the spilled partitions to disk, makes
+ * enough space for hash table and then releases any extra memory that is left from partition manager.
+ * In case any of the partitions is in memory, it uses the left over memory from partition manager to build
+ * the hash table and load data in it.
+ *
+ * @throws HyracksDataException
+ */
+ void closeBuild() throws HyracksDataException {
// Flushes the remaining chunks of the all spilled partitions to the disk.
- closeAllSpilledPartitions(SIDE.BUILD);
+ partitionsManager.flushAndCloseAllSpilledPartitions(SIDE.BUILD);
// Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
// during this step in order to make the space.)
// and tries to bring back as many spilled partitions as possible if there is free space.
int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
+ // At this point, all partitions are final(no reload or spill), as a such we can release the unused
+ // memory so they can be used in the buffer manager of hash table. Note that by loading data to
+ // in memory join, frames of those partitions will get copied to the in memory join as such they
+ // are not considered for being released.
+ partitionsManager.releaseUnusedMemory();
+
+ //Updates(reduces or no change) the memory budget of partition manager to the allocated memory to make sure
+ // we are not exceeding the budget as the sum of memory budget for partition manager and hash table should be
+ // equal to hash join memory budget.
+ partitionsManager.updateMemoryBudget();
+
+ initHashTable();
+
createInMemoryJoiner(inMemTupCount);
loadDataInMemJoin();
+
+ checkOneFrameReservedPerSpilledPartitions();
}
/**
- * In case of failure happens, we need to clear up the generated temporary files.
+ * Initializes the frame pool and buffer manager for hash table based on left over memory
+ * from partition manager.
*/
- public void clearBuildTempFiles() throws HyracksDataException {
- for (int i = 0; i < buildRFWriters.length; i++) {
- if (buildRFWriters[i] != null) {
- buildRFWriters[i].erase();
- }
- }
- }
-
- private void closeAllSpilledPartitions(SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- break;
- case PROBE:
- runFileWriters = probeRFWriters;
- break;
- }
- try {
- for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
- spilledStatus.nextSetBit(pid + 1)) {
- if (bufferManager.getNumTuples(pid) > 0) {
- bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
- bufferManager.clearPartition(pid);
- }
- }
- } finally {
- // Force to close all run file writers.
- if (runFileWriters != null) {
- for (RunFileWriter runFileWriter : runFileWriters) {
- if (runFileWriter != null) {
- runFileWriter.close();
- }
- }
- }
- }
+ private void initHashTable() {
+ int size = partitionsManager.getSizeOfUnusedMemory();
+ framePool = new DeallocatableFramePool(ctx, size);
+ bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
}
/**
@@ -278,93 +188,119 @@
* @throws HyracksDataException
*/
private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
- // we need number of |spilledPartitions| buffers to store the probe data
- int frameSize = ctx.getInitialFrameSize();
- long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
- // For partitions in main memory, we deduct their size from the free space.
+ int frameSize = ctx.getInitialFrameSize();
+ BitSet spilledStatus = partitionsManager.getSpilledStatus();
+ long freeSpace = partitionsManager.getSizeOfUnusedMemory();
+
int inMemTupCount = 0;
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
- freeSpace -= bufferManager.getPhysicalSize(p);
- inMemTupCount += buildPSizeInTups[p];
+ inMemTupCount += partitionsManager.getTuplesCount(p, SIDE.BUILD);
}
- // Calculates the expected hash table size for the given number of tuples in main memory
- // and deducts it from the free space.
long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
- freeSpace -= hashTableByteSizeForInMemTuples;
- // In the case where free space is less than zero after considering the hash table size,
- // we need to spill more partitions until we can accommodate the hash table in memory.
- // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
- // hash table, or keep spilling from the first partition)
- boolean moreSpilled = false;
+ if (freeSpace - hashTableByteSizeForInMemTuples < 0) {
+ int initialInMemTupCount = inMemTupCount;
+ inMemTupCount = spillPartitionsToAccommodateHashTable(hashTableByteSizeForInMemTuples, inMemTupCount);
+ long hashTableSizeDecrease =
+ -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(initialInMemTupCount,
+ -(initialInMemTupCount - inMemTupCount), frameSize);
+ hashTableByteSizeForInMemTuples += hashTableSizeDecrease;
+ }
- // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
- if (freeSpace < 0) {
- // Tries to find a best-fit partition not to spill many partitions.
- int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
- if (pidToSpill >= 0) {
- // There is a suitable one. We spill that partition to the disk.
- long hashTableSizeDecrease =
- -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
- -buildPSizeInTups[pidToSpill], frameSize);
- freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
- inMemTupCount -= buildPSizeInTups[pidToSpill];
- spillPartition(pidToSpill);
- closeBuildPartition(pidToSpill);
- moreSpilled = true;
- } else {
- // There is no single suitable partition. So, we need to spill multiple partitions to the disk
- // in order to accommodate the hash table.
- for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
- spilledStatus.nextClearBit(p + 1)) {
- int spaceToBeReturned = bufferManager.getPhysicalSize(p);
- int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
- if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
- continue;
- }
- spillPartition(p);
- closeBuildPartition(p);
- moreSpilled = true;
+ inMemTupCount = bringPartitionsBack(hashTableByteSizeForInMemTuples, inMemTupCount);
+
+ return inMemTupCount;
+ }
+
+ /**
+ * Spills some partitions out in order to make some space for the hash table.
+ *
+ * @param hashTableByteSizeForInMemTuples size of the current hash table
+ * @param inMemTupCount number of the tuples that are in memory
+ * @return number of tuples in memory
+ * @throws HyracksDataException
+ */
+ private int spillPartitionsToAccommodateHashTable(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+ throws HyracksDataException {
+ int frameSize = ctx.getInitialFrameSize();
+ int currentMemoryCounts = inMemTupCount;
+ long currentHashTableSize = hashTableByteSizeForInMemTuples;
+ long freeSpace = calculateFreeSpaceForHashTable(currentHashTableSize);
+ int pidToSpill = selectSinglePartitionToSpill(freeSpace, currentMemoryCounts, frameSize);
+ if (pidToSpill >= 0) {
+ currentMemoryCounts -= partitionsManager.getTuplesCount(pidToSpill, SIDE.BUILD);
+ partitionsManager.spillPartition(pidToSpill, SIDE.BUILD);
+ partitionsManager.closeBuildPartitionWriter(pidToSpill);
+ if (debug) {
+ checkOneFrameReservedPerSpilledPartitions();
+ }
+ } else {
+ BitSet spilledStatus = partitionsManager.getSpilledStatus();
+ for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+ spilledStatus.nextClearBit(p + 1)) {
+ int spaceToBeReturned = partitionsManager.getBufferManager().getPhysicalSize(p);
+ int numberOfTuplesToBeSpilled = partitionsManager.getTuplesCount(p, SIDE.BUILD);
+ if (spaceToBeReturned > 0 && numberOfTuplesToBeSpilled > 0) {
+ partitionsManager.spillPartition(p, SIDE.BUILD);
+ partitionsManager.closeBuildPartitionWriter(p);
// Since the number of tuples in memory has been decreased,
// the hash table size will be decreased, too.
// We put minus since the method returns a negative value to represent a newly reclaimed space.
long expectedHashTableSizeDecrease =
- -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+ -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentMemoryCounts,
-numberOfTuplesToBeSpilled, frameSize);
- freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
- // Adjusts the hash table size
- inMemTupCount -= numberOfTuplesToBeSpilled;
+ currentHashTableSize += expectedHashTableSizeDecrease;
+ freeSpace = calculateFreeSpaceForHashTable(currentHashTableSize);
+
+ currentMemoryCounts -= numberOfTuplesToBeSpilled;
+ if (debug) {
+ checkOneFrameReservedPerSpilledPartitions();
+ }
if (freeSpace >= 0) {
break;
}
}
}
}
+ return currentMemoryCounts;
+ }
- // If more partitions have been spilled to the disk, calculate the expected hash table size again
- // before bringing some partitions to main memory.
- if (moreSpilled) {
- hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
- }
-
- // Brings back some partitions if there is enough free space.
- int pid = 0;
- while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
- if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+ /**
+ * Brings back some partitions if there is free memory and partitions that fit in that space.
+ *
+ * @param hashTableByteSizeForInMemTuples Current hash table size. Used for recalculation of size of hash table.
+ * @param inMemTupCount Current number of tuples in memory. Used for recalculation of size of hash table.
+ * @return number of in memory tuples after bringing some (or none) partitions in memory.
+ * @throws HyracksDataException
+ */
+ private int bringPartitionsBack(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+ throws HyracksDataException {
+ int pid;
+ int currentMemoryTupleCount = inMemTupCount;
+ while ((pid = selectAPartitionToReload(hashTableByteSizeForInMemTuples, currentMemoryTupleCount)) >= 0) {
+ if (!partitionsManager.loadSpilledPartitionToMem(pid)) {
break;
}
- long expectedHashTableByteSizeIncrease = SerializableHashTable
- .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
- freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
- inMemTupCount += buildPSizeInTups[pid];
- // Adjusts the hash table size
- hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
+ currentMemoryTupleCount += partitionsManager.getTuplesCount(pid, SIDE.BUILD);
}
+ if (debug) {
+ checkOneFrameReservedPerSpilledPartitions();
+ }
+ return currentMemoryTupleCount;
+ }
- return inMemTupCount;
+ /**
+ * Calculates how much free space is left based on the memory that is not used by partition manager and current
+ * hash table size.
+ *
+ * @param hashTableByteSizeForInMemTuples size of hash table
+ * @return size of free space left not used by partition manager or hash table. It can be negative.
+ */
+ private long calculateFreeSpaceForHashTable(long hashTableByteSizeForInMemTuples) {
+ return partitionsManager.getSizeOfUnusedMemory() - hashTableByteSizeForInMemTuples;
}
/**
@@ -372,19 +308,24 @@
*
* @return the partition id that will be spilled to the disk. Returns -1 if there is no single suitable partition.
*/
- private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize) {
+ private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize)
+ throws HyracksDataException {
long spaceAfterSpill;
+ IPartitionedTupleBufferManager bufferManager = partitionsManager.getBufferManager();
long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
int minSpaceAfterSpillPartID = -1;
-
+ BitSet spilledStatus = partitionsManager.getSpilledStatus();
for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
spilledStatus.nextClearBit(p + 1)) {
- if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
+ if (partitionsManager.getTuplesCount(p, SIDE.BUILD) == 0 || bufferManager.getPhysicalSize(p) == 0) {
continue;
}
// We put minus since the method returns a negative value to represent a newly reclaimed space.
- spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
- .calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
+ // Size of one frame gets deducted because in case this partition spills, we need to reserve one frame
+ // for it.
+ spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) - ctx.getInitialFrameSize()
+ + (-SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount,
+ -partitionsManager.getTuplesCount(p, SIDE.BUILD), frameSize));
if (spaceAfterSpill == 0) {
// Found the perfect one. Just returns this partition.
return p;
@@ -397,49 +338,38 @@
return minSpaceAfterSpillPartID;
}
- private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
- for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
- spilledStatus.nextSetBit(i + 1)) {
- int spilledTupleCount = buildPSizeInTups[i];
- // Expected hash table size increase after reloading this partition
- long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
- inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
- if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
- return i;
+ /**
+ * Finds a partition that can fit in the left over memory.
+ * @param hashTableByteSizeForInMemTuples size of the current hash table
+ * @param inMemTupCount number of tuples currently in memory
+ * @return number of tuples in memory after reloading 1 or 0 partitions in memory.
+ * @throws HyracksDataException
+ */
+ private int selectAPartitionToReload(long hashTableByteSizeForInMemTuples, int inMemTupCount)
+ throws HyracksDataException {
+ long freeSpace = calculateFreeSpaceForHashTable(hashTableByteSizeForInMemTuples);
+ if (freeSpace > 0) {
+ BitSet spilledStatus = partitionsManager.getSpilledStatus();
+ for (int i = spilledStatus.nextSetBit(0); i >= 0 && i < numOfPartitions; i =
+ spilledStatus.nextSetBit(i + 1)) {
+ int spilledTupleCount = partitionsManager.getTuplesCount(i, SIDE.BUILD);
+ // Expected hash table size increase after reloading this partition
+ long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+ inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
+ if (freeSpace >= partitionsManager.getBuildRFWriters()[i].getFileSize()
+ + expectedHashTableByteSizeIncrease) {
+ return i;
+ }
}
}
return -1;
}
- private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
- RunFileReader r = wr.createReader();
- try {
- r.open();
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
- }
- while (r.nextFrame(reloadBuffer)) {
- accessorBuild.reset(reloadBuffer.getBuffer());
- for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
- if (bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- continue;
- }
- // for some reason (e.g. due to fragmentation) if the inserting failed,
- // we need to clear the occupied frames
- bufferManager.clearPartition(pid);
- return false;
- }
- }
- // Closes and deletes the run file if it is already loaded into memory.
- r.setDeleteAfterClose(true);
- } finally {
- r.close();
- }
- spilledStatus.set(pid, false);
- buildRFWriters[pid] = null;
- return true;
- }
-
+ /**
+ * Initializes the hash table and in memory join.
+ * @param inMemTupCount number of tuples that are currently in memory.
+ * @throws HyracksDataException
+ */
private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
this.inMemJoiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRd), probeHpc,
@@ -447,13 +377,19 @@
predEvaluator, isReversed, bufferManagerForHashTable);
}
+ /**
+ * Loads data from in-memory partitions to in-memory joiner.
+ * @throws HyracksDataException
+ */
private void loadDataInMemJoin() throws HyracksDataException {
for (int pid = 0; pid < numOfPartitions; pid++) {
- if (!spilledStatus.get(pid)) {
- bufferManager.flushPartition(pid, new IFrameWriter() {
+ if (!partitionsManager.isPartitionSpilled(pid)) {
+ partitionsManager.getBufferManager().flushPartition(pid, new IFrameWriter() {
@Override
public void open() throws HyracksDataException {
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
}
@@ -464,143 +400,88 @@
@Override
public void fail() throws HyracksDataException {
-
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
}
@Override
public void close() throws HyracksDataException {
-
+ //Not implemented as this method is only used to pass the frames of the given partition
+ //to the in memory joiner. As such, only next frame is important.
}
});
}
}
}
- public void initProbe() throws HyracksDataException {
-
- probePSizeInTups = new int[numOfPartitions];
- probeRFWriters = new RunFileWriter[numOfPartitions];
-
- }
-
+ /**
+ * Processes the tuples of incoming probe buffer. Multiple cases could happen:
+ * 1) All of the partitions are in memory : Just pass the buffer to in memory joiner.
+ * 2) There is at least one partition spilled :
+ * 2-1) It is not left outer join, or the corresponding partition from the build phase never
+ * received any record: There is no match for it, discard it.
+ * 2-2)There is a match for it: For each tuple check if it belongs to a spilled
+ * partition or an in-memory partition. If it belongs to a spilled partition add it to the output
+ * buffer of that parition, otherwise join it with in-memory tuples from build phase using in-memory joiner.
+ * @param buffer the incoming buffer from probe relation
+ * @param writer writer used for in memory joiner.
+ * @throws HyracksDataException
+ */
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessorProbe.reset(buffer);
- int tupleCount = accessorProbe.getTupleCount();
- if (isBuildRelAllInMemory()) {
+ if (partitionsManager.isBuildRelAllInMemory()) {
inMemJoiner.join(buffer, writer);
return;
}
- inMemJoiner.resetAccessorProbe(accessorProbe);
- for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
- if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
- if (spilledStatus.get(pid)) { //pid is Spilled
- while (!bufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
- int victim = pid;
- if (bufferManager.getNumTuples(pid) == 0) { // current pid is empty, choose the biggest one
- victim = spillPolicy.findSpilledPartitionWithMaxMemoryUsage();
- }
- if (victim < 0) { // current tuple is too big for all the free space
- flushBigProbeObjectToDisk(pid, accessorProbe, i);
- break;
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
- bufferManager.flushPartition(victim, runFileWriter);
- bufferManager.clearPartition(victim);
- }
+ accessorProbe.reset(buffer);
+ inMemJoiner.resetAccessorProbe(accessorProbe);
+
+ int tupleCount = accessorProbe.getTupleCount();
+ for (int tid = 0; tid < tupleCount; ++tid) {
+ int pid = probeHpc.partition(accessorProbe, tid, numOfPartitions);
+
+ if (partitionsManager.getTuplesCount(pid, SIDE.BUILD) > 0 || isLeftOuter) {
+ //Tuple has potential match from previous phase
+ if (partitionsManager.getSpilledStatus().get(pid)) { //pid is Spilled
+ partitionsManager.processTupleProbePhase(tid, pid);
} else { //pid is Resident
- inMemJoiner.join(i, writer);
+ inMemJoiner.join(tid, writer);
}
- probePSizeInTups[pid]++;
+ partitionsManager.incrementTupleCount(pid, SIDE.PROBE);
}
}
}
- private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
- throws HyracksDataException {
- if (bigProbeFrameAppender == null) {
- bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
- if (!bigProbeFrameAppender.append(accessorProbe, i)) {
- throw new HyracksDataException("The given tuple is too big");
- }
- bigProbeFrameAppender.write(runFileWriter, true);
- }
-
- private boolean isBuildRelAllInMemory() {
- return spilledStatus.nextSetBit(0) < 0;
- }
-
- public void completeProbe(IFrameWriter writer) throws HyracksDataException {
+ void completeProbe(IFrameWriter writer) throws HyracksDataException {
//We do NOT join the spilled partitions here, that decision is made at the descriptor level
//(which join technique to use)
inMemJoiner.completeJoin(writer);
}
- public void releaseResource() throws HyracksDataException {
+ void releaseResource() throws HyracksDataException {
inMemJoiner.closeTable();
- closeAllSpilledPartitions(SIDE.PROBE);
- bufferManager.close();
+ framePool.close();
inMemJoiner = null;
- bufferManager = null;
bufferManagerForHashTable = null;
+ partitionsManager.releaseResources();
}
- /**
- * In case of failure happens, we need to clear up the generated temporary files.
- */
- public void clearProbeTempFiles() throws HyracksDataException {
- for (int i = 0; i < probeRFWriters.length; i++) {
- if (probeRFWriters[i] != null) {
- probeRFWriters[i].erase();
- }
- }
- }
-
- public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
- return ((buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createDeleteOnCloseReader());
- }
-
- public int getBuildPartitionSizeInTup(int pid) {
- return (buildPSizeInTups[pid]);
- }
-
- public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
- return ((probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createDeleteOnCloseReader());
- }
-
- public int getProbePartitionSizeInTup(int pid) {
- return (probePSizeInTups[pid]);
- }
-
- public int getMaxBuildPartitionSize() {
- int max = buildPSizeInTups[0];
- for (int i = 1; i < buildPSizeInTups.length; i++) {
- if (buildPSizeInTups[i] > max) {
- max = buildPSizeInTups[i];
- }
- }
- return max;
- }
-
- public int getMaxProbePartitionSize() {
- int max = probePSizeInTups[0];
- for (int i = 1; i < probePSizeInTups.length; i++) {
- if (probePSizeInTups[i] > max) {
- max = probePSizeInTups[i];
- }
- }
- return max;
- }
-
- public BitSet getPartitionStatus() {
- return spilledStatus;
- }
-
- public void setIsReversed(boolean b) {
+ void setIsReversed(boolean b) {
this.isReversed = b;
}
+
+ HashJoinPartitionsManager getPartitionsManager() {
+ return partitionsManager;
+ }
+
+ private void checkOneFrameReservedPerSpilledPartitions() throws HyracksDataException {
+ //Make sure that there is one frame reserved for each spilled partition.
+ int frameSize = ctx.getInitialFrameSize();
+ if (memSizeInFrames * frameSize
+ - partitionsManager.getSizeOfUnusedMemory() < partitionsManager.getSpilledStatus().cardinality()
+ * frameSize) {
+ throw new HyracksDataException("There should be at least one frame reserved for each spilled partition.");
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..7023a93 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -227,12 +227,14 @@
@Override
public void toBytes(DataOutput out) throws IOException {
-
+ //Not implemented as this class is only used to store the state of the of build phase
+ //so it can be used during the probe phase.
}
@Override
public void fromBytes(DataInput in) throws IOException {
-
+ //Not implemented as this class is only used to store the state of the of build phase
+ //so it can be used during the probe phase.
}
}
@@ -263,7 +265,7 @@
final ITuplePairComparator probComparator =
tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+ predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -305,7 +307,7 @@
if (state.hybridHJ != null) {
state.hybridHJ.closeBuild();
if (isFailed) {
- state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
} else {
ctx.setStateObject(state);
if (LOGGER.isTraceEnabled()) {
@@ -369,6 +371,9 @@
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
+ /*TODO: Currently the memory for the buffer is coming from hyracks context
+ which means that it is not considered in the memory budget and may lead
+ to memory usage more than the budget.*/
private IFrame rPartbuff = new VSizeFrame(ctx);
private FrameTupleAppender nullResultAppender = null;
@@ -381,7 +386,6 @@
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
- state.hybridHJ.initProbe();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
@@ -404,8 +408,8 @@
if (failed) {
try {
// Clear temp files if fail() was called.
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+ state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
+ state.hybridHJ.getPartitionsManager().clearProbeTempFiles();
} finally {
writer.close(); // writer should always be closed.
}
@@ -418,12 +422,12 @@
} finally {
state.hybridHJ.releaseResource();
}
- BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+ BitSet partitionStatus = state.hybridHJ.getPartitionsManager().getSpilledStatus();
rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
partitionStatus.nextSetBit(pid + 1)) {
- RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
- RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+ RunFileReader bReader = state.hybridHJ.getPartitionsManager().getBuildRFReader(pid);
+ RunFileReader pReader = state.hybridHJ.getPartitionsManager().getProbeRFReader(pid);
if (bReader == null || pReader == null) {
if (isLeftOuter && pReader != null) {
@@ -431,8 +435,10 @@
}
continue;
}
- int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
- int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+ int bSize = state.hybridHJ.getPartitionsManager().getTuplesCount(pid,
+ OptimizedHybridHashJoin.SIDE.BUILD);
+ int pSize = state.hybridHJ.getPartitionsManager().getTuplesCount(pid,
+ OptimizedHybridHashJoin.SIDE.PROBE);
joinPartitionPair(bReader, pReader, bSize, pSize, 1);
}
} catch (Exception e) {
@@ -440,8 +446,8 @@
// to send the failure signal to the downstream, when there is a throwable thrown.
writer.fail();
// Clear temp files as this.fail() nor this.close() will no longer be called after close().
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+ state.hybridHJ.getPartitionsManager().clearBuildTempFiles();
+ state.hybridHJ.getPartitionsManager().clearProbeTempFiles();
// Re-throw the whatever is caught.
throw e;
} finally {
@@ -587,7 +593,6 @@
probeSideReader.open();
rPartbuff.reset();
try {
- rHHj.initProbe();
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
@@ -601,11 +606,13 @@
}
try {
- int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
- int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
+ int maxAfterBuildSize =
+ rHHj.getPartitionsManager().getMaxPartitionSize(OptimizedHybridHashJoin.SIDE.BUILD);
+ int maxAfterProbeSize =
+ rHHj.getPartitionsManager().getMaxPartitionSize(OptimizedHybridHashJoin.SIDE.PROBE);
int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
- BitSet rPStatus = rHHj.getPartitionStatus();
+ BitSet rPStatus = rHHj.getPartitionsManager().getSpilledStatus();
if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
//Case 2.1.1 - Keep applying HHJ
if (LOGGER.isDebugEnabled()) {
@@ -613,10 +620,12 @@
+ "(isLeftOuter || build<probe) - [Level " + level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
- int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
- int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
+ RunFileReader rbrfw = rHHj.getPartitionsManager().getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getPartitionsManager().getProbeRFReader(rPid);
+ int rbSizeInTuple = rHHj.getPartitionsManager().getTuplesCount(rPid,
+ OptimizedHybridHashJoin.SIDE.BUILD);
+ int rpSizeInTuple = rHHj.getPartitionsManager().getTuplesCount(rPid,
+ OptimizedHybridHashJoin.SIDE.PROBE);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
@@ -639,8 +648,8 @@
+ "(isLeftOuter || build<probe) - [Level " + level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
- RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
- RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
+ RunFileReader rbrfw = rHHj.getPartitionsManager().getBuildRFReader(rPid);
+ RunFileReader rprfw = rHHj.getPartitionsManager().getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
@@ -650,8 +659,10 @@
continue;
}
- int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
- int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
+ int buildSideInTups = rHHj.getPartitionsManager().getTuplesCount(rPid,
+ OptimizedHybridHashJoin.SIDE.BUILD);
+ int probeSideInTups = rHHj.getPartitionsManager().getTuplesCount(rPid,
+ OptimizedHybridHashJoin.SIDE.PROBE);
// NLJ order is outer + inner, the order is reversed from the other joins
if (isLeftOuter || probeSideInTups < buildSideInTups) {
//checked-modified
@@ -665,14 +676,17 @@
} catch (Exception e) {
// Make sure that temporary run files generated in recursive hybrid hash joins
// are closed and deleted.
- rHHj.clearBuildTempFiles();
- rHHj.clearProbeTempFiles();
+ rHHj.getPartitionsManager().clearBuildTempFiles();
+ rHHj.getPartitionsManager().clearProbeTempFiles();
throw e;
}
}
private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
if (nullResultAppender == null) {
+ /*TODO: Currently the memory for the buffer is coming from hyracks context
+ which means that it is not considered in the memory budget and may lead
+ to memory usage more than the budget.*/
nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
if (probeTupleAccessor == null) {
@@ -769,7 +783,9 @@
new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
nlj.setIsReversed(isReversed);
-
+ /*TODO: Currently the memory for the buffer is coming from hyracks context
+ which means that it is not considered in the memory budget and may lead
+ to memory usage more than the budget.*/
IFrame cacheBuff = new VSizeFrame(ctx);
try {
innerReader.open();
@@ -785,6 +801,9 @@
}
}
try {
+ /*TODO: Currently the memory for the buffer is coming from hyracks context
+ which means that it is not considered in the memory budget and may lead
+ to memory usage more than the budget.*/
IFrame joinBuff = new VSizeFrame(ctx);
outerReader.open();
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index e6da7c9..71e5fd0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -48,15 +48,16 @@
ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold) throws HyracksDataException {
super(tableSize, ctx, false);
this.bufferManager = bufferManager;
-
- ByteBuffer newFrame = getFrame(frameSize);
- if (newFrame == null) {
- throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+ if (tableSize > 0) {
+ ByteBuffer newFrame = getFrame(frameSize);
+ if (newFrame == null) {
+ throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+ }
+ IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+ frameCapacity = frame.capacity();
+ contents.add(frame);
+ currentOffsetInEachFrameList.add(0);
}
- IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
- frameCapacity = frame.capacity();
- contents.add(frame);
- currentOffsetInEachFrameList.add(0);
this.garbageCollectionThreshold = garbageCollectionThreshold;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
new file mode 100644
index 0000000..bbea993
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/join/HashJoinPartitionsManagerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.join;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class HashJoinPartitionsManagerTest {
+ HashJoinPartitionsManager partitionsManager;
+ static IHyracksTaskContext ctx;
+ String buildRel = "BUILD";
+ String probeRel = "PROBE";
+ static FrameTupleAccessor accessorBuild;
+ static FrameTupleAccessor accessorProbe;
+ static IHyracksJobletContext jobletContext;
+ static IOManager ioManager;
+ static FileReference file;
+
+ @BeforeClass
+ public static void classSetUp() throws HyracksDataException {
+ file = Mockito.mock(FileReference.class);
+ ioManager = Mockito.mock(IOManager.class);
+ ctx = Mockito.mock(IHyracksTaskContext.class);
+ jobletContext = Mockito.mock(IHyracksJobletContext.class);
+ Mockito.when(jobletContext.createManagedWorkspaceFile(Mockito.anyString())).thenReturn(file);
+ Mockito.when(ctx.getJobletContext()).thenReturn(jobletContext);
+ Mockito.when(ctx.getIoManager()).thenReturn(ioManager);
+ accessorBuild = Mockito.mock(FrameTupleAccessor.class);
+ accessorProbe = Mockito.mock(FrameTupleAccessor.class);
+ ctx = Mockito.mock(IHyracksTaskContext.class);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ partitionsManager = new HashJoinPartitionsManager(ctx, 10, 8, accessorBuild, accessorProbe, buildRel, probeRel);
+ partitionsManager.init();
+ }
+
+ @Test
+ public void incrementTupleCount_Build() throws HyracksDataException {
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+ for (int i = 0; i < 5; i++) {
+ partitionsManager.incrementTupleCount(1, OptimizedHybridHashJoin.SIDE.BUILD);
+ }
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 5;
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+ }
+
+ @Test
+ public void incrementTupleCount_Probe() throws HyracksDataException {
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 0;
+ for (int i = 0; i < 5; i++) {
+ partitionsManager.incrementTupleCount(1, OptimizedHybridHashJoin.SIDE.PROBE);
+ }
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.PROBE) == 5;
+ assert partitionsManager.getTuplesCount(1, OptimizedHybridHashJoin.SIDE.BUILD) == 0;
+ }
+
+ @Test(expected = HyracksDataException.class)
+ public void closeBuildPartitionWriter() throws HyracksDataException {
+ partitionsManager.closeBuildPartitionWriter(1);
+ }
+
+ @Test
+ public void clearBuildTempFiles() throws HyracksDataException {
+ RunFileWriter rfw = Mockito.mock(RunFileWriter.class);
+ partitionsManager.setRunFileWriter(1, OptimizedHybridHashJoin.SIDE.BUILD, rfw);
+ partitionsManager.clearBuildTempFiles();
+ Mockito.verify(rfw).erase();
+ }
+
+ @Test
+ public void clearProbeTempFiles() throws HyracksDataException {
+ RunFileWriter rfw = Mockito.mock(RunFileWriter.class);
+ partitionsManager.setRunFileWriter(1, OptimizedHybridHashJoin.SIDE.PROBE, rfw);
+ partitionsManager.clearProbeTempFiles();
+ Mockito.verify(rfw).erase();
+ }
+
+ @Test(expected = HyracksDataException.class)
+ public void selectAndSpillVictim_NoSpaceLeftException() throws Throwable {
+ try {
+ Class cls = partitionsManager.getClass();
+ Method method = cls.getDeclaredMethod("selectAndSpillVictim", int.class);
+ method.setAccessible(true);
+ Field spillPolicy = cls.getDeclaredField("spillPolicy");
+ spillPolicy.setAccessible(true);
+ PreferToSpillFullyOccupiedFramePolicy spp =
+ (PreferToSpillFullyOccupiedFramePolicy) spillPolicy.get(partitionsManager);
+ method.invoke(partitionsManager, 2);
+ Mockito.verify(spp).selectVictimPartition(2);
+ } catch (InvocationTargetException e) {
+ throw e.getTargetException();
+ }
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1: Contrib+1
BAD Compatibility Tests Successful
https://asterix-jenkins.ics.uci.edu/job/asterixbad-compat/4592/ : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 02:00:23 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-cancellation-test/6014/ (2/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:16 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-no-installer-app/6151/ (15/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:34 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-ssl-compression/768/ (16/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:36 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-sonar/9955/ (7/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:19 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-storage/6573/ (8/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:20 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Anon. E. Moose (Code Review)" <de...@asterixdb.apache.org>.
Anon. E. Moose (1000171) has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Analytics Compatibility Compilation Successful
https://cbjenkins.page.link/wUJxRFW7ZxXHMBLh6 : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:46:28 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app-openjdk11/1358/ (6/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:18 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-verify-txnlog/1169/ (11/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:25 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/8767/
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:45:44 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
BAD Compatibility Tests Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-compat/4592/
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:45:42 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/hyracks-gerrit/5910/ (1/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:16 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-asterix-app-sql-execution/5991/ (13/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:30 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-spidersilk-tests/928/ (14/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:32 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1: Integration-Tests+1
Integration Tests Successful
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/8767/ : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 03:00:26 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-verify-asterix-app/6362/ (12/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:27 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-ensure-ancestor/3996/ (9/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:22 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-source-format/5974/ (3/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-source-assemblies/6212/ (5/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-stabilization-f69489-compat/1262/ (10/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:23 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Anon. E. Moose (Code Review)" <de...@asterixdb.apache.org>.
Anon. E. Moose (1000171) has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1: Contrib-2
Analytics Compatibility Tests Failed
https://cbjenkins.page.link/AVzcjPAGtg1j35dc6 : UNSTABLE
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 03:11:56 +0000
Gerrit-HasComments: No
Change in asterixdb[master]: NoGrow-NoSteal algorithm for large object in hybrid hash join.
Posted by "Jenkins (Code Review)" <de...@asterixdb.apache.org>.
Jenkins has posted comments on this change. ( https://asterix-gerrit.ics.uci.edu/3454 )
Change subject: NoGrow-NoSteal algorithm for large object in hybrid hash join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/11482/ (4/16)
--
To view, visit https://asterix-gerrit.ics.uci.edu/3454
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I25e26bb499a42077a611ebbebf56075704fc5e74
Gerrit-Change-Number: 3454
Gerrit-PatchSet: 1
Gerrit-Owner: Shiva Jahangiri <sh...@uci.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Comment-Date: Sun, 23 Jun 2019 01:42:17 +0000
Gerrit-HasComments: No