You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:34 UTC
[44/50] [abbrv] asterixdb git commit: new partition join algorithm
new partition join algorithm
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0b900514
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0b900514
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0b900514
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 0b90051413c96ae52cc3136d41b5589275f11385
Parents: b34a426
Author: Preston Carman <pr...@apache.org>
Authored: Fri Sep 30 13:56:13 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Fri Sep 30 13:56:13 2016 -0700
----------------------------------------------------------------------
.../IntervalPartitionJoinPOperator.java | 2 +-
.../InMemoryIntervalPartitionJoin.java | 98 ---
...IntervalPartitionJoinOperatorDescriptor.java | 355 +++++----
.../IntervalPartitionJoinTaskState.java | 33 +
.../IntervalPartitionJoiner.java | 772 ++++---------------
.../IntervalPartitionUtil.java | 70 --
...IntervalPartitionJoinOperatorDescriptor.java | 319 --------
.../IntervalPartitionJoinTaskState.java | 33 -
.../IntervalPartitionJoiner.java | 288 -------
9 files changed, 412 insertions(+), 1558 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index af77a92..73d159e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -23,7 +23,7 @@ import java.util.List;
import java.util.logging.Logger;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
-import org.apache.asterix.runtime.operators.joins.intervalpartition2.IntervalPartitionJoinOperatorDescriptor;
+import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
deleted file mode 100644
index aeea209..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
-import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-
-public class InMemoryIntervalPartitionJoin {
-
- private static final Logger LOGGER = Logger.getLogger(InMemoryIntervalPartitionJoin.class.getName());
-
- private final FrameTupleAccessor accessorBuild;
- private final FrameTupleAppender appender;
- private final IFrameBufferManager fbm;
- private BufferInfo bufferInfo;
- private final IIntervalMergeJoinChecker imjc;
-
- private long joinComparisonCount = 0;
- private long joinResultCount = 0;
-
- public InMemoryIntervalPartitionJoin(IHyracksTaskContext ctx, IFrameBufferManager fbm,
- IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd, RecordDescriptor probeRd)
- throws HyracksDataException {
- bufferInfo = new BufferInfo(null, -1, -1);
- this.accessorBuild = new FrameTupleAccessor(buildRd);
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
- this.imjc = imjc;
- this.fbm = fbm;
- LOGGER.fine(
- "InMemoryIntervalPartitionJoin has been created for Thread ID " + Thread.currentThread().getId() + ".");
- }
-
- public long getComparisonCount() {
- return joinComparisonCount;
- }
-
- public long getResultCount() {
- return joinResultCount;
- }
-
- public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, IFrameWriter writer)
- throws HyracksDataException {
- if (fbm.getNumFrames() != 0) {
- fbm.resetIterator();
- int frameIndex = fbm.next();
- while (fbm.exists()) {
- fbm.getFrame(frameIndex, bufferInfo);
- accessorBuild.reset(bufferInfo.getBuffer());
- for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
- if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex,
- false)) {
- appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
- }
- joinComparisonCount++;
- }
- frameIndex = fbm.next();
- }
- }
- }
-
- public void closeJoin(IFrameWriter writer) throws HyracksDataException {
- appender.write(writer, true);
- }
-
- private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
- int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
- FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
- joinResultCount++;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index b4965ef..ddbe913 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -20,7 +20,6 @@
package org.apache.asterix.runtime.operators.joins.intervalpartition;
import java.nio.ByteBuffer;
-import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
@@ -36,34 +35,29 @@ import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
- private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
-
- private static final String PROBE_REL = "RelR";
- private static final String BUILD_REL = "RelS";
-
- private final int memsize;
- private final int[] probeKeys;
- private final int[] buildKeys;
-
+ private static final int LEFT_ACTIVITY_ID = 0;
+ private static final int RIGHT_ACTIVITY_ID = 1;
+ private final int[] leftKeys;
+ private final int[] rightKeys;
+ private final int memoryForJoin;
+ private final IIntervalMergeJoinCheckerFactory imjcf;
+ private final RangeId rangeId;
private final int k;
private final int probeKey;
private final int buildKey;
- private final IIntervalMergeJoinCheckerFactory imjcf;
- private final RangeId rangeId;
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
@@ -71,180 +65,253 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
RangeId rangeId) {
super(spec, 2, 1);
- this.memsize = memoryForJoin;
+ recordDescriptors[0] = recordDescriptor;
this.buildKey = leftKeys[0];
this.probeKey = rightKeys[0];
this.k = k;
- this.buildKeys = leftKeys;
- this.probeKeys = rightKeys;
- recordDescriptors[0] = recordDescriptor;
+ this.leftKeys = leftKeys;
+ this.rightKeys = rightKeys;
+ this.memoryForJoin = memoryForJoin;
this.imjcf = imjcf;
this.rangeId = rangeId;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
- ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
- IActivity phase1 = new PartitionAndBuildActivityNode(p1Aid, p2Aid);
- IActivity phase2 = new ProbeAndJoinActivityNode(p2Aid, p1Aid);
+ MergeJoinLocks locks = new MergeJoinLocks();
- builder.addActivity(this, phase1);
- builder.addSourceEdge(0, phase1, 0);
+ ActivityId leftAid = new ActivityId(odId, LEFT_ACTIVITY_ID);
+ ActivityId rightAid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
- builder.addActivity(this, phase2);
- builder.addSourceEdge(1, phase2, 0);
+ IActivity leftAN = new LeftJoinerActivityNode(leftAid, rightAid, locks);
+ IActivity rightAN = new RightDataActivityNode(rightAid, leftAid, locks);
- builder.addBlockingEdge(phase1, phase2);
+ builder.addActivity(this, rightAN);
+ builder.addSourceEdge(1, rightAN, 0);
- builder.addTargetEdge(0, phase2, 0);
+ builder.addActivity(this, leftAN);
+ builder.addSourceEdge(0, leftAN, 0);
+ builder.addTargetEdge(0, leftAN, 0);
}
- public static class BuildAndPartitionTaskState extends AbstractStateObject {
- private IntervalPartitionJoiner ipj;
- private int intervalPartitions;
- private int partition;
- private int k;
- private int memoryForJoin;
-
- private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
- }
-
- private class PartitionAndBuildActivityNode extends AbstractActivityNode {
+ private class LeftJoinerActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private final ActivityId probeAid;
+ private final MergeJoinLocks locks;
- public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) {
+ public LeftJoinerActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
super(id);
- this.probeAid = probeAid;
+ this.locks = locks;
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-
- final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
- ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
- private boolean failure = false;
-
- @Override
- public void open() throws HyracksDataException {
- if (memsize <= 2) {
- // Dedicated buffers: One buffer to read and one buffer for output
- failure = true;
- throw new HyracksDataException("not enough memory for join");
- }
- state.k = k;
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ locks.setPartitions(nPartitions);
+ final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ return new LeftJoinerOperator(ctx, partition, inRecordDesc);
+ }
- RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
- long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
- partition);
- long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
- ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
- partitionStart, partitionEnd).createPartitioner();
- ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
- partitionStart, partitionEnd).createPartitioner();
+ private class LeftJoinerOperator extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- state.partition = partition;
- state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
- state.memoryForJoin = memsize;
- IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx);
- state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
- BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
-
- state.ipj.buildInit();
- LOGGER.setLevel(Level.FINE);
- System.out
- .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel());
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k
- + " granules repesenting " + state.intervalPartitions + " interval partitions using "
- + state.memoryForJoin + " frames for memory.");
- }
- }
+ private final IHyracksTaskContext ctx;
+ private final int partition;
+ private final RecordDescriptor leftRd;
+ private IntervalPartitionJoinTaskState state;
+ private boolean first = true;
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.ipj.buildStep(buffer);
- }
+ public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.leftRd = inRecordDesc;
+ }
- @Override
- public void close() throws HyracksDataException {
- if (!failure) {
- state.ipj.buildClose();
- ctx.setStateObject(state);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin closed its build phase");
+ @Override
+ public void open() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ writer.open();
+ state = new IntervalPartitionJoinTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));;
+ state.leftRd = leftRd;
+ ctx.setStateObject(state);
+ locks.getRight(partition).signal();
+
+ do {
+ // Continue after joiner created in right branch.
+ if (state.partitionJoiner == null) {
+ locks.getLeft(partition).await();
}
- }
+ } while (state.partitionJoiner == null);
+ state.status.branch[LEFT_ACTIVITY_ID].setStageOpen();
+ locks.getRight(partition).signal();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ locks.getLock(partition).unlock();
}
-
- @Override
- public void fail() throws HyracksDataException {
- failure = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ locks.getLock(partition).lock();
+ if (first) {
+ state.status.branch[LEFT_ACTIVITY_ID].setStageData();
+ first = false;
}
-
- };
-
+ try {
+ state.partitionJoiner.setFrame(LEFT_ACTIVITY_ID, buffer);
+ state.partitionJoiner.processLeftFrame(writer);
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.failed = true;
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.status.branch[LEFT_ACTIVITY_ID].noMore();
+ if (state.failed) {
+ writer.fail();
+ } else {
+ state.partitionJoiner.processLeftClose(writer);
+ writer.close();
+ }
+ state.status.branch[LEFT_ACTIVITY_ID].setStageClose();
+ locks.getRight(partition).signal();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
}
}
- private class ProbeAndJoinActivityNode extends AbstractActivityNode {
+ private class RightDataActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
+ private final ActivityId joinAid;
+ private final MergeJoinLocks locks;
+
+ public RightDataActivityNode(ActivityId id, ActivityId joinAid, MergeJoinLocks locks) {
super(id);
+ this.joinAid = joinAid;
+ this.locks = locks;
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
+ locks.setPartitions(nPartitions);
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ return new RightDataOperator(ctx, partition, inRecordDesc);
+ }
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private BuildAndPartitionTaskState state;
-
- @Override
- public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) ctx.getStateObject(
- new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
- writer.open();
- state.ipj.probeInit();
-
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin is starting the probe phase.");
- }
- }
+ private class RightDataOperator extends AbstractUnaryInputSinkOperatorNodePushable {
+
+ private int partition;
+ private IHyracksTaskContext ctx;
+ private final RecordDescriptor rightRd;
+ private IntervalPartitionJoinTaskState state;
+ private boolean first = true;
+
+ public RightDataOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.rightRd = inRecordDesc;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ do {
+ // Wait for the state to be set in the context form Left.
+ state = (IntervalPartitionJoinTaskState) ctx.getStateObject(new TaskId(joinAid, partition));
+ if (state == null) {
+ locks.getRight(partition).await();
+ }
+ } while (state == null);
+ state.k = k;
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.ipj.probeStep(buffer, writer);
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
+ long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
+ partition);
+ long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
+ ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
+ partitionStart, partitionEnd).createPartitioner();
+ ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
+ partitionStart, partitionEnd).createPartitioner();
+ IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
+
+ state.rightRd = rightRd;
+ state.partitionJoiner = new IntervalPartitionJoiner(ctx, memoryForJoin, partition, state.k,
+ state.status, locks, imjc, state.leftRd, state.rightRd, buildHpc, probeHpc);
+ state.status.branch[RIGHT_ACTIVITY_ID].setStageOpen();
+ locks.getLeft(partition).signal();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ locks.getLock(partition).unlock();
}
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ locks.getLock(partition).lock();
+ if (first) {
+ state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
+ first = false;
}
-
- @Override
- public void close() throws HyracksDataException {
- state.ipj.probeClose(writer);
- state.ipj.joinSpilledPartitions(writer);
- state.ipj.closeAndDeleteRunFiles();
- writer.close();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin closed its probe phase");
+ try {
+ while (!state.status.continueRightLoad
+ && state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
+ // Wait for the state to request right frame unless left has finished.
+ locks.getRight(partition).await();
}
+ state.partitionJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+ state.status.continueRightLoad = false;
+ locks.getLeft(partition).signal();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.failed = true;
+ locks.getLeft(partition).signal();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.status.branch[RIGHT_ACTIVITY_ID].setStageClose();
+ locks.getLeft(partition).signal();
+ } finally {
+ locks.getLock(partition).unlock();
}
- };
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
new file mode 100644
index 0000000..1939899
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinTaskState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition;
+
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.std.join.MergeJoinTaskState;
+
+public class IntervalPartitionJoinTaskState extends MergeJoinTaskState {
+ protected IntervalPartitionJoiner partitionJoiner;
+ public int k;
+
+ public IntervalPartitionJoinTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 9c5a872..984db20 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -18,21 +18,17 @@
*/
package org.apache.asterix.runtime.operators.joins.intervalpartition;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -41,685 +37,251 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
+import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
+import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner;
+import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
+import org.apache.hyracks.dataflow.std.join.MergeStatus;
+import org.apache.hyracks.dataflow.std.structures.RunFilePointer;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-/**
- * The Interval Partition Join runs in three stages: build, probe-in-memory, probe-spill.
- * build: Saves all build partitions either to memory or disk.
- * probe-in-memory: Joins all in memory partitions and saves the necessary partitions to disk.
- * probe-spill: Spilled build partitions are loaded into memory and joined with all probe remaining partitions.
- */
-public class IntervalPartitionJoiner {
+public class IntervalPartitionJoiner extends AbstractMergeJoiner {
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
- private enum SIDE {
- BUILD,
- PROBE
- }
-
- private IHyracksTaskContext ctx;
-
- private final String buildRelName;
- private final String probeRelName;
+ private final RunFileWriter probeRunFileWriter;
+ private int probeRunFilePid = -1;
private final ITuplePartitionComputer buildHpc;
private final ITuplePartitionComputer probeHpc;
- private final RecordDescriptor buildRd;
- private final RecordDescriptor probeRd;
-
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
private final int buildMemory;
private final int k;
private final int numOfPartitions;
- private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
-
- private VPartitionTupleBufferManager buildBufferManager;
+ private long buildSize = 0;
+ private long probeSize = 0;
+ private final TreeMap<RunFilePointer, Integer> probeRunFilePointers;
+ private final VPartitionTupleBufferManager buildBufferManager;
+ private final TuplePointer tempPtr = new TuplePointer();
+ private final List<Integer> buildInMemoryPartitions;
private final FrameTupleAccessor accessorBuild;
- private final FrameTupleAccessor accessorProbe;
-
- // stats information
- private IntervalPartitionJoinData ipjd;
-
- private IFrame reloadBuffer;
- private TuplePointer tempPtr = new TuplePointer();
-
- private IIntervalMergeJoinChecker imjc;
+ private BufferInfo bufferInfo;
+ private long spillWriteCount = 0;
+ private long spillReadCount = 0;
private long joinComparisonCount = 0;
private long joinResultCount = 0;
- private long spillReadCount = 0;
- private long spillWriteCount = 0;
- private long buildSize;
- private long probeSize;
- private int tmp = -1;
-
- private RunFileWriter probeRunFileWriter = null;
- private final IFrameTupleAppender probeRunFileAppender;
- private int probeRunFilePid = -1;
-
- public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
- String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
- RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc)
- throws HyracksDataException {
- this.ctx = ctx;
- // TODO fix available memory size
- this.buildMemory = memForJoin;
- this.k = k;
- this.buildRd = buildRd;
- this.probeRd = probeRd;
- this.buildHpc = buildHpc;
- this.probeHpc = probeHpc;
- this.imjc = imjc;
- this.buildRelName = buildRelName;
- this.probeRelName = probeRelName;
+ private final IIntervalMergeJoinChecker imjc;
+ private final FrameTupleAccessor accessorProbe;
+ private final IFrame reloadBuffer;
+ private boolean moreBuildProcessing = true;
+ private final List<IFrameBufferManager> fbms = new ArrayList<>();
- this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
- this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
+ public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memorySize, int partition, int k, MergeStatus status,
+ MergeJoinLocks locks, IIntervalMergeJoinChecker imjc, RecordDescriptor leftRd, RecordDescriptor rightRd,
+ ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) throws HyracksDataException {
+ super(ctx, partition, status, locks, leftRd, rightRd);
- this.accessorBuild = new FrameTupleAccessor(buildRd);
- this.accessorProbe = new FrameTupleAccessor(probeRd);
+ bufferInfo = new BufferInfo(null, -1, -1);
+ this.accessorProbe = new FrameTupleAccessor(leftRd);
reloadBuffer = new VSizeFrame(ctx);
- probeRunFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
- ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
- }
- public void buildInit() throws HyracksDataException {
+ this.numOfPartitions = IntervalPartitionUtil.getMaxPartitions(k);;
+ this.imjc = imjc;
+
+ // TODO fix available memory size
+ this.buildMemory = memorySize;
buildBufferManager = new VPartitionTupleBufferManager(ctx, VPartitionTupleBufferManager.NO_CONSTRAIN,
numOfPartitions, buildMemory * ctx.getInitialFrameSize());
- System.err.println("k: " + k);
- buildSize = 0;
- }
- public void buildStep(ByteBuffer buffer) throws HyracksDataException {
- accessorBuild.reset(buffer);
- int tupleCount = accessorBuild.getTupleCount();
+ this.k = k;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
- int pid;
- for (int i = 0; i < tupleCount; ++i) {
- pid = buildHpc.partition(accessorBuild, i, k);
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile("IntervalPartitionJoiner");
+ probeRunFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ probeRunFileWriter.open();
- if (tmp != pid) {
- System.err.println("buildSize: " + buildSize + " pid: " + pid + " k: " + k + " pair: "
- + IntervalPartitionUtil.getIntervalPartition(pid, k));
- tmp = pid;
- }
- processTuple(i, pid);
- ipjd.buildIncrementCount(pid);
- buildSize++;
- }
- }
+ probeRunFilePointers = new TreeMap<>(RunFilePointer.ASC);
+ buildInMemoryPartitions = new LinkedList<>();
- public void buildClose() throws HyracksDataException {
- System.err.println("buildSize: " + buildSize);
-
- int inMemoryPartitions = 0;
- int totalBuildPartitions = 0;
- flushAndClearBuildSpilledPartition();
-
- // Trying to bring back as many spilled partitions as possible, making them resident
- bringBackSpilledPartitionIfHasMoreMemory(false);
-
- // Update build partition join map based on partitions with actual data.
- for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
- if (ipjd.buildGetCount(i) == 0) {
- ipjd.buildRemoveFromJoin(i);
- } else if (ipjd.buildGetCount(i) > 0) {
- // Set up build memory for processing joins for partitions in memory.
- createInMemoryJoiner(i);
- inMemoryPartitions++;
- totalBuildPartitions += ipjd.buildGetCount(i);
- }
- }
+ this.accessorBuild = new FrameTupleAccessor(rightRd);
+ LOGGER.setLevel(Level.FINE);
+ System.out.println("IntervalIndexJoiner: Logging level is: " + LOGGER.getLevel());
if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
- + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
- + ipjd.buildGetSpilledCount());
+ LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize
+ + " frames of memory.");
}
}
- private void processTuple(int tid, int pid) throws HyracksDataException {
- while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- int victimPartition = selectPartitionToSpill();
- if (victimPartition < 0) {
- throw new HyracksDataException(
- "No more space left in the memory buffer, please give join more memory budgets.");
- }
- spillPartition(victimPartition);
- }
- }
-
- private int selectPartitionToSpill() {
- int partitionToSpill = selectLargestSpilledPartition();
- int maxToSpillPartSize = 0;
- if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
- .getInitialFrameSize()) {
- int partitionInMem = selectNextInMemoryPartitionToSpill();
- if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
- partitionToSpill = partitionInMem;
+ @Override
+ public void processLeftFrame(IFrameWriter writer) throws HyracksDataException {
+ while (inputAccessor[LEFT_PARTITION].exists()) {
+ int pid = probeHpc.partition(inputAccessor[LEFT_PARTITION], inputAccessor[LEFT_PARTITION].getTupleId(), k);
+
+ if (probeRunFilePid != pid) {
+ // Log new partition locations.
+ RunFilePointer rfp = new RunFilePointer(probeRunFileWriter.getFileSize(),
+ inputAccessor[LEFT_PARTITION].getTupleId());
+ probeRunFilePointers.put(rfp, pid);
+ probeRunFilePid = pid;
}
+ inputAccessor[LEFT_PARTITION].next();
+ probeSize++;
}
- return partitionToSpill;
+ inputBuffer[LEFT_PARTITION].rewind();
+ probeRunFileWriter.nextFrame(inputBuffer[LEFT_PARTITION]);
+ spillWriteCount++;
}
- /**
- * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
- *
- * @return
- */
- private int selectNextInMemoryPartitionToSpill() {
- for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
- if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
- return i;
- }
- }
- return -1;
- }
+ @Override
+ public void processLeftClose(IFrameWriter writer) throws HyracksDataException {
+ joinLoopOnMemory(writer);
- private int selectLargestSpilledPartition() {
- int pid = -1;
- int max = 0;
- for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
- int partSize = buildBufferManager.getPhysicalSize(i);
- if (partSize > max) {
- max = partSize;
- pid = i;
- }
+ // Flush result.
+ resultAppender.write(writer, true);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("IntervalPartitionJoiner statitics: " + k + " k, " + joinComparisonCount + " comparisons, "
+ + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
}
- return pid;
}
- private void spillPartition(int pid) throws HyracksDataException {
- RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
- spillWriteCount += buildBufferManager.getNumFrames(pid);
- buildBufferManager.flushPartition(pid, writer);
- buildBufferManager.clearPartition(pid);
- ipjd.buildSpill(pid);
- }
+ private void joinLoopOnMemory(IFrameWriter writer) throws HyracksDataException {
+ RunFileReader pReader = probeRunFileWriter.createDeleteOnCloseReader();
+ pReader.open();
+ // Load first frame.
+ loadReaderNextFrame(pReader);
- private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- String refName = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- refName = buildRelName;
- break;
- case PROBE:
- refName = probeRelName;
- runFileWriters = probeRFWriters;
- break;
- default:
- }
- RunFileWriter writer = runFileWriters[pid];
- if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- runFileWriters[pid] = writer;
- }
- return writer;
- }
+ while (moreBuildProcessing) {
+ fillMemory();
+ joinMemoryBlockWithRunFile(writer, pReader);
- public void clearBuildMemory() throws HyracksDataException {
- for (int pid = 0; pid < numOfPartitions; ++pid) {
- if (buildBufferManager.getNumTuples(pid) > 0) {
+ // Clean up
+ for (int pid : buildInMemoryPartitions) {
buildBufferManager.clearPartition(pid);
}
+ buildInMemoryPartitions.clear();
}
- ipjd.buildClearMemory();
- }
-
- private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
- for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
- if (buildBufferManager.getNumTuples(pid) > 0) {
- spillWriteCount += buildBufferManager.getNumFrames(pid);
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
- buildBufferManager.flushPartition(pid, runFileWriter);
- buildBufferManager.clearPartition(pid);
- buildRFWriters[pid].close();
- }
- }
- }
-
- private void flushProbeSpilledPartition() throws HyracksDataException {
- if (probeRunFileWriter != null) {
- // flush previous runFile
- probeRunFileAppender.write(probeRunFileWriter, true);
- probeRunFileWriter.close();
- spillWriteCount++;
- }
- }
-
- private void bringBackSpilledPartitionIfHasMoreMemory(boolean partitalLoad) throws HyracksDataException {
- int freeFrames = buildMemory;
- for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
- freeFrames -= buildBufferManager.getNumFrames(i);
- }
-
- int pid = 0;
- while ((pid = selectPartitionsToReload(freeFrames, pid, partitalLoad)) >= 0 && freeFrames > 0) {
- if (pid == 225) {
- int i = 0;
- }
- if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
- return;
- }
- freeFrames -= buildBufferManager.getNumFrames(pid);
- }
+ pReader.close();
}
- int buildParitialLoadPid = -1;
- int buildParitialNextTid = -1;
- long buildParitialResetReader = -1;
-
- private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
- if (pid == 225) {
- int i = 0;
- }
- RunFileReader r = wr.createReader();
- r.open();
- if (buildParitialLoadPid == pid && buildParitialResetReader > 0) {
- r.reset(buildParitialResetReader);
- }
- int framesLoaded = 0;
- while (r.nextFrame(reloadBuffer)) {
- framesLoaded++;
- accessorBuild.reset(reloadBuffer.getBuffer());
- spillReadCount++;
- for (int tid = buildParitialNextTid > 0 ? buildParitialNextTid : 0; tid < accessorBuild
- .getTupleCount(); tid++) {
- if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- // for some reason (e.g. due to fragmentation) if the inserting failed
- // we need to start this partition from this location on the next round.
- buildParitialLoadPid = pid;
- buildParitialNextTid = tid;
- buildParitialResetReader = r.getReadPointer();
- ipjd.buildLoad(pid);
- createInMemoryJoiner(pid);
- r.close();
- return false;
+ private void joinMemoryBlockWithRunFile(IFrameWriter writer, RunFileReader pReader) throws HyracksDataException {
+ // Join Disk partitions with Memory partitions
+ for (RunFilePointer probeId : probeRunFilePointers.navigableKeySet()) {
+ Pair<Integer, Integer> probe = IntervalPartitionUtil.getIntervalPartition(probeRunFilePointers.get(probeId),
+ k);
+ for (int buildId : buildInMemoryPartitions) {
+ Pair<Integer, Integer> build = IntervalPartitionUtil.getIntervalPartition(buildId, k);
+ if (imjc.compareIntervalPartition(probe.first, probe.second, build.first, build.second)) {
+ fbms.add(buildBufferManager.getPartitionFrameBufferManager(buildId));
}
}
- }
- if (framesLoaded == 0) {
- int t = 0;
- }
-
- ipjd.buildLoad(pid);
- createInMemoryJoiner(pid);
- r.close();
- buildRFWriters[pid] = null;
- buildParitialLoadPid = -1;
- buildParitialNextTid = -1;
- buildParitialResetReader = -1;
- return true;
- }
-
- private int selectPartitionsToReload(int freeFrames, int pid, boolean partitalLoad) {
- int freeSpace = freeFrames * ctx.getInitialFrameSize();
- if (freeSpace > 0 && buildParitialLoadPid > 0 && buildParitialResetReader > 0) {
- return buildParitialLoadPid;
- }
- for (int id = ipjd.buildNextSpilled(pid); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
- assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled partition have size 0?";
- if (partitalLoad || freeSpace >= buildRFWriters[id].getFileSize()) {
- return id;
+ if (!fbms.isEmpty()) {
+ join(pReader, probeId, fbms, writer);
}
+ fbms.clear();
}
- return -1;
}
- private void createInMemoryJoiner(int pid) throws HyracksDataException {
- inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
- buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
- }
-
- private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
- joinComparisonCount += inMemJoiner[pid].getComparisonCount();
- joinResultCount += inMemJoiner[pid].getResultCount();
- inMemJoiner[pid].closeJoin(writer);
- inMemJoiner[pid] = null;
- }
-
- public void probeInit() throws HyracksDataException {
- probeRFWriters = new RunFileWriter[numOfPartitions];
- probeSize = 0;
- }
-
- public void probeStep(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessorProbe.reset(buffer);
- int tupleCount = accessorProbe.getTupleCount();
+ private void join(RunFileReader pReader, RunFilePointer rfpStart, List<IFrameBufferManager> buildFbms,
+ IFrameWriter writer) throws HyracksDataException {
+ long fileOffsetStart = rfpStart.getFileOffset();
+ int tupleStart = rfpStart.getTupleIndex();
- for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, k);
+ RunFilePointer rfpEnd = probeRunFilePointers.higherKey(rfpStart);
+ long fileOffsetEnd = rfpEnd == null ? pReader.getFileSize() : rfpEnd.getFileOffset();
+ int tupleEnd = rfpEnd == null ? Integer.MAX_VALUE : rfpEnd.getTupleIndex();
- if (tmp != pid) {
- System.err.println("probeSize: " + probeSize + " pid: " + pid + " k: " + k + " pair: "
- + IntervalPartitionUtil.getIntervalPartition(pid, k));
- tmp = pid;
- }
-
- if (!ipjd.hasProbeJoinMap(pid)) {
- // Set probe join map
- ipjd.setProbeJoinMap(pid,
- IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
- }
-
- // Tuple has potential match from build phase
- if (!ipjd.isProbeJoinMapEmpty(pid)) {
- if (ipjd.probeHasSpilled(pid)) {
- // pid is Spilled
- probeSpillTuple(accessorProbe, i, pid);
- }
- for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
- // pid has join partitions that are Resident
- int j = pidIterator.next();
- if (inMemJoiner[j] != null) {
- inMemJoiner[j].join(accessorProbe, i, writer);
- }
- }
- }
- ipjd.probeIncrementCount(pid);
- probeSize++;
- }
- }
-
- /**
- * Closes the probe process.
- * We do NOT join the spilled partitions here, use {@link joinSpilledPartitions}.
- *
- * @param writer
- * @throws HyracksDataException
- */
- public void probeClose(IFrameWriter writer) throws HyracksDataException {
- System.err.println("probeSize: " + probeSize);
-
- for (int i = 0; i < inMemJoiner.length; ++i) {
- if (inMemJoiner[i] != null) {
- closeInMemoryJoiner(i, writer);
- ipjd.buildLogJoined(i);
- ipjd.buildRemoveFromJoin(i);
- }
- }
- clearBuildMemory();
- flushProbeSpilledPartition();
- }
-
- private void probeSpillTuple(IFrameTupleAccessor accessorProbe, int probeTupleIndex, int pid)
- throws HyracksDataException {
- if (pid != probeRunFilePid) {
- flushProbeSpilledPartition();
- probeRunFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
- probeRunFilePid = pid;
- }
- if (!probeRunFileAppender.append(accessorProbe, probeTupleIndex)) {
- probeRunFileAppender.write(probeRunFileWriter, true);
- probeRunFileAppender.append(accessorProbe, probeTupleIndex);
- spillWriteCount++;
+ if (pReader.getReadPointer() != fileOffsetStart) {
+ pReader.reset(fileOffsetStart);
+ loadReaderNextFrame(pReader);
}
- }
+ do {
+ int start = pReader.getReadPointer() == fileOffsetStart ? tupleStart : 0;
+ int end = pReader.getReadPointer() == fileOffsetEnd ? tupleEnd : accessorProbe.getTupleCount();
- public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
- return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
- }
-
- public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
- return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
- }
-
- public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
- LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
- while (ipjd.buildGetSpilledCount() > 0) {
- // Load back spilled build partitions.
- // TODO only load partition required for spill join. Consider both sides.
- bringBackSpilledPartitionIfHasMoreMemory(true);
-
- // Create in memory joiners.
- // for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
- // .buildNextInMemoryWithResults(pid + 1)) {
- // createInMemoryJoiner(pid);
- // }
-
- probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
-
- // Join all build partitions with disk probe partitions.
- for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
- if (entry.getKey() == 221) {
- int t = 0;
- }
- System.err.println(" join pid: " + entry.getKey() + " with : " + probeInMemoryJoinMap);
-
- if (ipjd.probeGetCount(entry.getKey()) > 0 && !probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
- joinSpilledProbeWithBuildMemory(writer, probeInMemoryJoinMap, entry.getKey());
- }
- }
-
- // Clean up build memory.
- for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
- .buildNextInMemoryWithResults(pid + 1)) {
- closeInMemoryJoiner(pid, writer);
- if (pid != buildParitialLoadPid) {
- ipjd.buildLogJoined(pid);
- ipjd.buildRemoveFromJoin(pid);
- } else {
- int t = 0;
+ for (int i = start; i < end; ++i) {
+ // Tuple has potential match from build phase
+ for (IFrameBufferManager fbm : buildFbms) {
+ joinTupleWithMemoryPartition(accessorProbe, i, fbm, writer);
}
}
- clearBuildMemory();
- }
+ } while (pReader.getReadPointer() < fileOffsetEnd && loadReaderNextFrame(pReader));
}
- private void joinSpilledProbeWithBuildMemory(IFrameWriter writer,
- LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap, int probePid)
- throws HyracksDataException {
- RunFileReader pReader = getProbeRFReader(probePid);
- pReader.open();
- while (pReader.nextFrame(reloadBuffer)) {
+ private boolean loadReaderNextFrame(RunFileReader pReader) throws HyracksDataException {
+ if (pReader.nextFrame(reloadBuffer)) {
accessorProbe.reset(reloadBuffer.getBuffer());
spillReadCount++;
- for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
- // Tuple has potential match from build phase
- for (Integer j : probeInMemoryJoinMap.get(probePid)) {
- // j has join partitions that are Resident
- if (inMemJoiner[j] != null) {
- inMemJoiner[j].join(accessorProbe, i, writer);
- }
- }
- }
+ return true;
}
- pReader.close();
+ return false;
}
- class IntervalPartitionJoinData {
- private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
-
- private int[] buildPSizeInTups;
- private int[] probePSizeInTups;
-
- private BitSet buildJoinedCompleted; //0=waiting, 1=joined
- private BitSet buildSpilledStatus; //0=resident, 1=spilled
- private BitSet buildInMemoryStatus; //0=unknown, 1=resident
- private BitSet probeSpilledStatus; //0=resident, 1=spilled
-
- public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
- probeJoinMap = new LinkedHashMap<>();
-
- buildPSizeInTups = new int[numberOfPartitions];
- probePSizeInTups = new int[numberOfPartitions];
-
- buildJoinedCompleted = new BitSet(numberOfPartitions);
- buildInMemoryStatus = new BitSet(numberOfPartitions);
- buildSpilledStatus = new BitSet(numberOfPartitions);
- probeSpilledStatus = new BitSet(numberOfPartitions);
- }
-
- public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
- return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
- }
-
- public boolean hasProbeJoinMap(int pid) {
- return probeJoinMap.containsKey(pid);
- }
-
- public boolean isProbeJoinMapEmpty(int pid) {
- return probeJoinMap.get(pid).isEmpty();
- }
-
- public Iterator<Integer> getProbeJoinMap(int pid) {
- return probeJoinMap.get(pid).iterator();
- }
-
- public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
- probeJoinMap.put(new Integer(pid), map);
- for (Integer i : map) {
- if (buildIsSpilled(i)) {
- // Build join partition has spilled. Now spill the probe also.
- probeSpilledStatus.set(pid);
+ public void joinTupleWithMemoryPartition(IFrameTupleAccessor accessorProbe, int probeTupleIndex,
+ IFrameBufferManager fbm, IFrameWriter writer) throws HyracksDataException {
+ if (fbm.getNumFrames() == 0) {
+ return;
+ }
+ fbm.resetIterator();
+ int frameIndex = fbm.next();
+ while (fbm.exists()) {
+ fbm.getFrame(frameIndex, bufferInfo);
+ accessorBuild.reset(bufferInfo.getBuffer());
+ for (int buildTupleIndex = 0; buildTupleIndex < accessorBuild.getTupleCount(); ++buildTupleIndex) {
+ if (imjc.checkToSaveInResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, false)) {
+ appendToResult(accessorBuild, buildTupleIndex, accessorProbe, probeTupleIndex, writer);
}
+ joinComparisonCount++;
}
+ frameIndex = fbm.next();
}
+ }
- public void buildClearMemory() {
- buildInMemoryStatus.clear();
- }
-
- public void buildIncrementCount(int pid) {
- buildInMemoryStatus.set(pid);
- buildPSizeInTups[pid]++;
- }
-
- public int buildGetCount(int pid) {
- return buildPSizeInTups[pid];
- }
-
- public void buildLogJoined(int pid) {
- buildSpilledStatus.clear(pid);
- buildJoinedCompleted.set(pid);
- }
-
- public void buildRemoveFromJoin(int pid) {
- buildSpilledStatus.clear(pid);
- buildJoinedCompleted.set(pid);
- }
-
- public boolean buildHasBeenJoined(int pid) {
- return buildJoinedCompleted.get(pid);
- }
-
- public int buildGetSpilledCount() {
- return buildSpilledStatus.cardinality();
- }
-
- public void buildSpill(int pid) {
- buildInMemoryStatus.clear(pid);
- buildSpilledStatus.set(pid);
- }
-
- public void buildLoad(int pid) {
- buildInMemoryStatus.set(pid);
- buildSpilledStatus.clear(pid);
- }
-
- public boolean buildIsSpilled(int pid) {
- return buildSpilledStatus.get(pid);
- }
-
- public int buildNextSpilled(int pid) {
- return buildSpilledStatus.nextSetBit(pid);
- }
-
- public int buildNextInMemoryWithResults(int pid) {
- int nextPid = buildNextInMemory(pid);
- do {
- if (nextPid < 0 || buildGetCount(nextPid) > 0) {
- return nextPid;
- }
- nextPid = buildNextInMemory(nextPid + 1);
- } while (nextPid >= 0);
- return -1;
- }
+ private void appendToResult(IFrameTupleAccessor accessorBuild, int buildSidetIx, IFrameTupleAccessor accessorProbe,
+ int probeSidetIx, IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.appendConcatToWriter(writer, resultAppender, accessorBuild, buildSidetIx, accessorProbe,
+ probeSidetIx);
+ joinResultCount++;
+ }
- public int buildNextInMemory(int pid) {
- int nextPid = buildSpilledStatus.nextClearBit(pid);
- if (nextPid >= numOfPartitions) {
- return -1;
+ private void fillMemory() throws HyracksDataException {
+ int buildPid = -1;
+ TupleStatus ts;
+ for (ts = loadRightTuple(); ts.isLoaded(); ts = loadRightTuple()) {
+ int pid = buildHpc.partition(inputAccessor[RIGHT_PARTITION], inputAccessor[RIGHT_PARTITION].getTupleId(),
+ k);
+ if (!buildBufferManager.insertTuple(pid, inputAccessor[RIGHT_PARTITION],
+ inputAccessor[RIGHT_PARTITION].getTupleId(), tempPtr)) {
+ return;
}
- do {
- if (!buildHasBeenJoined(nextPid)) {
- return nextPid;
- }
- nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
- } while (nextPid >= 0 && nextPid < numOfPartitions);
- return -1;
- }
-
- public void probeIncrementCount(int pid) {
- probePSizeInTups[pid]++;
- }
-
- public int probeGetCount(int pid) {
- return probePSizeInTups[pid];
- }
-
- public void probeSpill(int pid) {
- probeSpilledStatus.set(pid);
- }
- public boolean probeHasSpilled(int pid) {
- return probeSpilledStatus.get(pid);
- }
-
- public int buildGetMaxPartitionSize() {
- int max = buildPSizeInTups[0];
- for (int i = 1; i < buildPSizeInTups.length; i++) {
- if (buildPSizeInTups[i] > max) {
- max = buildPSizeInTups[i];
- }
+ if (buildPid != pid) {
+ // Track new partitions in memory.
+ buildInMemoryPartitions.add(pid);
+ buildPid = pid;
}
- return max;
+ inputAccessor[RIGHT_PARTITION].next();
+ buildSize++;
}
-
- public int probeGetMaxPartitionSize() {
- int max = probePSizeInTups[0];
- for (int i = 1; i < probePSizeInTups.length; i++) {
- if (probePSizeInTups[i] > max) {
- max = probePSizeInTups[i];
- }
- }
- return max;
+ if (ts.isEmpty()) {
+ moreBuildProcessing = false;
}
-
}
- public void closeAndDeleteRunFiles() throws HyracksDataException {
- for (RunFileWriter rfw : buildRFWriters) {
- if (rfw != null) {
- FileUtils.deleteQuietly(rfw.getFileReference().getFile());
- }
- }
- for (RunFileWriter rfw : probeRFWriters) {
- if (rfw != null) {
- FileUtils.deleteQuietly(rfw.getFileReference().getFile());
- }
- }
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("IntervalPartitionJoiner statitics: " + joinComparisonCount + " comparisons, "
- + joinResultCount + " results, " + spillWriteCount + " written, " + spillReadCount + " read.");
+ private TupleStatus loadRightTuple() throws HyracksDataException {
+ TupleStatus loaded = loadMemoryTuple(RIGHT_PARTITION);
+ if (loaded == TupleStatus.UNKNOWN) {
+ loaded = pauseAndLoadRightTuple();
}
+ return loaded;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b900514/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 453287d..671c082 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -18,14 +18,6 @@
*/
package org.apache.asterix.runtime.operators.joins.intervalpartition;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.dataflow.value.IRangeMap;
@@ -109,27 +101,6 @@ public class IntervalPartitionUtil {
return (k * k + k) / 2;
}
- public static void printJoinPartitionMap(ArrayList<HashSet<Integer>> partitionMap) {
- for (int i = 0; i < partitionMap.size(); ++i) {
- System.out.print("(hashset) Partition " + i + " must join with partition(s): ");
- for (Integer map : partitionMap.get(i)) {
- System.out.print(map + " ");
- }
- System.out.println("");
- }
- }
-
- public static void printPartitionMap(int k) {
- for (int i = 0; i < k; ++i) {
- for (int j = i; j < k; ++j) {
- int pid = intervalPartitionMap(i, j, k);
- Pair<Integer, Integer> partition = getIntervalPartition(pid, k);
- System.out.println("Map partition (" + i + ", " + j + ") to partition id: " + pid + " back to pair ("
- + partition.first + ", " + partition.second + ")");
- }
- }
- }
-
/**
* Map the partition start and end points to a single value.
* The mapped partitions are sorted in interval starting at 0.
@@ -192,47 +163,6 @@ public class IntervalPartitionUtil {
return partitionEnd;
}
- public static LinkedHashSet<Integer> getProbeJoinPartitions(int pid, int[] buildPSizeInTups,
- IIntervalMergeJoinChecker imjc, int k) {
- LinkedHashSet<Integer> joinMap = new LinkedHashSet<>();
- Pair<Integer, Integer> map = getIntervalPartition(pid, k);
- int probeStart = map.first;
- int probeEnd = map.second;
- // Build partitions with data
- for (int buildStart = 0; buildStart < k; ++buildStart) {
- for (int buildEnd = k - 1; buildStart <= buildEnd; --buildEnd) {
- int buildId = intervalPartitionMap(buildStart, buildEnd, k);
- if (buildPSizeInTups[buildId] > 0) {
- // Join partitions for probe's pid
- if (!(buildStart == 0 && probeStart == 0)
- && imjc.compareIntervalPartition(buildStart, buildEnd, probeStart, probeEnd)) {
- joinMap.add(buildId);
- }
- }
- }
- }
- return joinMap;
- }
-
- public static LinkedHashMap<Integer, LinkedHashSet<Integer>> getInMemorySpillJoinMap(
- LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap, BitSet buildInMemoryStatus,
- BitSet probeSpilledStatus) {
- LinkedHashMap<Integer, LinkedHashSet<Integer>> inMemoryMap = new LinkedHashMap<>();
- for (Entry<Integer, LinkedHashSet<Integer>> entry : probeJoinMap.entrySet()) {
- if (probeSpilledStatus.get(entry.getKey())) {
- for (Integer i : entry.getValue()) {
- if (buildInMemoryStatus.get(i)) {
- if (!inMemoryMap.containsKey(entry.getKey())) {
- inMemoryMap.put(entry.getKey(), new LinkedHashSet<Integer>());
- }
- inMemoryMap.get(entry.getKey()).add(i);
- }
- }
- }
- }
- return inMemoryMap;
- }
-
public static long getPartitionDuration(long partitionStart, long partitionEnd, int k) throws HyracksDataException {
if (k <= 2) {
throw new HyracksDataException("k is to small for interval partitioner.");