You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:33 UTC
[43/50] [abbrv] asterixdb git commit: new partition join algorithm
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
deleted file mode 100644
index a985eee..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinOperatorDescriptor.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionComputerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.RangeId;
-import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
-import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
-import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
-
-public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- private static final int LEFT_ACTIVITY_ID = 0;
- private static final int RIGHT_ACTIVITY_ID = 1;
- private final int[] leftKeys;
- private final int[] rightKeys;
- private final int memoryForJoin;
- private final IIntervalMergeJoinCheckerFactory imjcf;
- private final RangeId rangeId;
- private final int k;
-
- private final int probeKey;
- private final int buildKey;
-
- private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
-
- public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k,
- int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
- RangeId rangeId) {
- super(spec, 2, 1);
- recordDescriptors[0] = recordDescriptor;
- this.buildKey = leftKeys[0];
- this.probeKey = rightKeys[0];
- this.k = k;
- this.leftKeys = leftKeys;
- this.rightKeys = rightKeys;
- this.memoryForJoin = memoryForJoin;
- this.imjcf = imjcf;
- this.rangeId = rangeId;
- }
-
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- MergeJoinLocks locks = new MergeJoinLocks();
-
- ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
- ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
-
- IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks);
- IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks);
-
- builder.addActivity(this, rightAN);
- builder.addSourceEdge(1, rightAN, 0);
-
- builder.addActivity(this, leftAN);
- builder.addSourceEdge(0, leftAN, 0);
- builder.addTargetEdge(0, leftAN, 0);
- }
-
- private class LeftJoinerActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- private final MergeJoinLocks locks;
-
- public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
- super(id);
- this.locks = locks;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- locks.setPartitions(nPartitions);
- final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- return new LeftJoinerOperator(ctx, partition, inRecordDesc);
- }
-
- private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- private final IHyracksTaskContext ctx;
- private final int partition;
- private final RecordDescriptor leftRd;
- private IntervalPartitionJoinTaskState state;
- private boolean first = true;
-
- public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
- this.ctx = ctx;
- this.partition = partition;
- this.leftRd = inRecordDesc;
- }
-
- @Override
- public void open() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- writer.open();
- state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(),
- new TaskId(getActivityId(), partition));;
- state.leftRd = leftRd;
- ctx.setStateObject(state);
- locks.getRight(partition).signal();
-
- do {
- // Continue after joiner created in right branch.
- if (state.partitionJoiner == null) {
- locks.getLeft(partition).await();
- }
- } while (state.partitionJoiner == null);
- state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
- locks.getRight(partition).signal();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- locks.getLock(partition).lock();
- if (first) {
- state.status.branch[LEFT_ACTIVITY_ID].setStageData();
- first = false;
- }
- try {
- state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
- state.partitionJoiner.processLeftFrame(writer);
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- state.failed = true;
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- state.status.branch[LEFT_ACTIVITY_ID].noMore();
- if (state.failed) {
- writer.fail();
- } else {
- state.partitionJoiner.processLeftClose(writer);
- writer.close();
- }
- state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
- locks.getRight(partition).signal();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
- }
- }
-
- private class RightDataActivityNode extends AbstractActivityNode {
- private static final long serialVersionUID = 1L;
-
- private final ActivityId joinAid;
- private final MergeJoinLocks locks;
-
- public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
- super(id);
- this.joinAid = joinAid;
- this.locks = locks;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- locks.setPartitions(nPartitions);
- RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- return new RightDataOperator(ctx, partition, inRecordDesc);
- }
-
- private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable {
-
- private int partition;
- private IHyracksTaskContext ctx;
- private final RecordDescriptor rightRd;
- private IntervalPartitionJoinTaskState state;
- private boolean first = true;
-
- public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
- this.ctx = ctx;
- this.partition = partition;
- this.rightRd = inRecordDesc;
- }
-
- @Override
- public void open() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- do {
- // Wait for the state to be set in the context form Left.
- state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition));
- if (state == null) {
- locks.getRight(partition).await();
- }
- } while (state == null);
- state.k = k;
-
- RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
- long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
- partition);
- long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
- ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
- partitionStart, partitionEnd).createPartitioner();
- ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
- partitionStart, partitionEnd).createPartitioner();
- IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
-
- state.rightRd = rightRd;
- state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k,
- state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc);
- state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
- locks.getLeft(partition).signal();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- locks.getLock(partition).lock();
- if (first) {
- state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
- first = false;
- }
- try {
- while (!state.status.continueRightLoad
- && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
- // Wait for the state to request right frame unless left has finished.
- locks.getRight(partition).await();
- }
- state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
- state.status.continueRightLoad = false;
- locks.getLeft(partition).signal();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- state.failed = true;
- locks.getLeft(partition).signal();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- locks.getLock(partition).lock();
- try {
- state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
- locks.getLeft(partition).signal();
- } finally {
- locks.getLock(partition).unlock();
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
deleted file mode 100644
index e8563c2..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoinTaskState.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState;
-
-public class IntervalPartitionJoinTaskState extends MergeJoinTaskState {
- protected IntervalPartitionJoiner partitionJoiner;
- public int k;
-
- public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
deleted file mode 100644
index fb2edd7..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition2/IntervalPartitionJoiner.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition2;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
-import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
-import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
-import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
-import org.apache.hyracks.dataflow.std.join.MergeStatus;
-import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class IntervalPartitionJoiner extends AbstractMergeJoiner {
-
- private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
-
- private final RunFileWriter probeRunFileWriter;
- private int probeRunFilePid = -1;
-
- private final ITuplePartitionComputer buildHpc;
- private final ITuplePartitionComputer probeHpc;
-
- private final int buildMemory;
- private final int k;
- private final int numOfPartitions;
- private long buildSize = 0;
- private long probeSize = 0;
- private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
-
- private final VPartitionTupleBufferManager buildBufferManager;
- private final TuplePointer tempPtr = new TuplePointer();
- private final List<Integer> buildInMemoryPartitions;
- private final FrameTupleAccessor accessorBuild;
- private BufferInfo bufferInfo;
-
- private long spillWriteCount = 0;
- private long spillReadCount = 0;
- private long joinComparisonCount = 0;
- private long joinResultCount = 0;
- private final IIntervalMergeJoinChecker imjc;
- private final FrameTupleAccessor accessorProbe;
- private final IFrame reloadBuffer;
- private boolean moreBuildProcessing = true;
- private final List<IFrameBufferManager> fbms = new ArrayList<>();
-
- public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status,
- MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd,
- ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException {
- super(ctx, partition, status, locks, leftRd, rightRd);
-
- bufferInfo = new BufferInfo(null, -1, -1);
-
- this.accessorProbe = new FrameTupleAccessor(leftRd);
- reloadBuffer = new VSizeFrame(ctx);
-
- this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
- this.imjc = imjc;
-
- // TODO fix available memory size
- this.buildMemory = memorySize;
- buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
- numOfPartitions, buildMemory * ctx.getInitialFrameSize());
-
- this.k = k;
- this.buildHpc = buildHpc;
- this.probeHpc = probeHpc;
-
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
- probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
- probeRunFileWriter.open();
-
- probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
- buildInMemoryPartitions = new LinkedList<>();
-
- this.accessorBuild = new FrameTupleAccessor(rightRd);
-
- LOGGER.setLevel(Level.FINE);
- System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
- + " frames of memory.");
- }
- }
-
- @Override
- public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
- while (inputAccessor[LEFT_PARTITION].exists()) {
- int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k);
-
- if (probeRunFilePid != pid) {
- // Log new partition locations.
- RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(),
- inputAccessor[LEFT_PARTITION].getTupleId());
- probeRunFilePointers.put(rfp, pid);
- probeRunFilePid = pid;
- }
- inputAccessor[LEFT_PARTITION].next();
- probeSize++;
- }
- inputBuffer[LEFT_PARTITION].rewind();
- probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
- spillWriteCount++;
- }
-
- @Override
- public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
- joinLoopOnMemory(writer);
-
- // Flush result.
- resultAppender.write(writer, true);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
- + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
- }
- }
-
- private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
- RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
- pReader.open();
- // Load first frame.
- loadReaderNextFrame(pReader);
-
- while (moreBuildProcessing) {
- fillMemory();
- joinMemoryBlockWithRunFile(writer, pReader);
-
- // Clean up
- for (int pid : buildInMemoryPartitions) {
- buildBufferManager.clearPartition(pid);
- }
- buildInMemoryPartitions.clear();
- }
- pReader.close();
- }
-
- private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException {
- // Join Disk partitions with Memory partitions
- for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
- Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
- k);
- for (int buildId : buildInMemoryPartitions) {
- Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
- if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
- fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
- }
- }
- if (!fbms.isEmpty()) {
- join(pReader, probeId, fbms, writer);
- }
- fbms.clear();
- }
- }
-
- private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms,
- IFrameWriter writer) throws HyracksDataException {
- long fileOffsetStart = rfpStart.getFileOffset();
- int tupleStart = rfpStart.getTupleIndex();
-
- RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
- long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset();
- int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex();
-
- if (pReader.getReadPointer() != fileOffsetStart) {
- pReader.reset(fileOffsetStart);
- loadReaderNextFrame(pReader);
- }
- do {
- int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0;
- int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount();
-
- for (int i = start; i < end; ++i) {
- // Tuple has potential match from build phase
- for (IFrameBufferManager fbm : buildFbms) {
- joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer);
- }
- }
- } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader));
- }
-
- private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException {
- if (pReader.nextFrame(reloadBuffer)) {
- accessorProbe.reset(reloadBuffer.getBuffer());
- spillReadCount++;
- return true;
- }
- return false;
- }
-
- public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex,
- IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException {
- if (fbm.getNumFrames() == 0) {
- return;
- }
- fbm.resetIterator();
- int frameIndex = fbm.next();
- while (fbm.exists()) {
- fbm.getFrame(frameIndex, bufferInfo);
- accessorBuild.reset(bufferInfo.getBuffer());
- for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
- if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
- appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
- }
- joinComparisonCount++;
- }
- frameIndex = fbm.next();
- }
- }
-
- private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
- int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
- FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe,
- probeSidetIx);
- joinResultCount++;
- }
-
- private void fillMemory() throws HyracksDataException {
- int buildPid = -1;
- TupleStatus ts;
- for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
- int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(),
- k);
- if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION],
- inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
- return;
- }
-
- if (buildPid != pid) {
- // Track new partitions in memory.
- buildInMemoryPartitions.add(pid);
- buildPid = pid;
- }
- inputAccessor[RIGHT_PARTITION].next();
- buildSize++;
- }
- if (ts.isEmpty()) {
- moreBuildProcessing = false;
- }
- }
-
- private TupleStatus loadRightTuple() throws HyracksDataException {
- TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
- if (loaded == TupleStatus.UNKNOWN) {
- loaded = pauseAndLoadRightTuple();
- }
- return loaded;
- }
-
-}