You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:54:55 UTC
[05/50] [abbrv] asterixdb git commit: added more to the interval hint
added more to the interval hint
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fd84e345
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fd84e345
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fd84e345
Branch: refs/heads/ecarm002/interval_join_merge
Commit: fd84e345ec29f9b85f32132eef1f6e7f98941866
Parents: 0f533e8
Author: Preston Carman <pr...@apache.org>
Authored: Wed Jul 6 18:15:43 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Wed Jul 6 18:15:43 2016 -0700
----------------------------------------------------------------------
.../IntervalPartitionJoinPOperator.java | 20 +-
.../rules/IntervalSplitPartitioningRule.java | 1 -
.../asterix/optimizer/rules/util/JoinUtils.java | 21 +-
.../interval_overlapping.11.query.aql | 2 +-
.../interval_overlapping.12.query.aql | 2 +-
.../IntervalJoinExpressionAnnotation.java | 65 +-
...econdaryIndexSearchExpressionAnnotation.java | 15 +-
.../asterix-lang-aql/src/main/javacc/AQL.jj | 4 +-
.../IntervalPartitionJoin.java | 649 -------------------
...IntervalPartitionJoinOperatorDescriptor.java | 16 +-
.../IntervalPartitionJoiner.java | 649 +++++++++++++++++++
.../IntervalPartitionUtil.java | 26 +-
.../AbstractExpressionAnnotation.java | 35 +
.../ExpressionAnnotationNoCopyImpl.java | 14 +-
.../IndexedNLJoinExpressionAnnotation.java | 14 +-
15 files changed, 788 insertions(+), 745 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index 414d0b4..1eff2aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -33,18 +33,18 @@ import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperator {
private final int memSizeInFrames;
- private final int probeTupleCount;
- private final int probeMaxDuration;
- private final int buildTupleCount;
- private final int buildMaxDuration;
+ private final long probeTupleCount;
+ private final long probeMaxDuration;
+ private final long buildTupleCount;
+ private final long buildMaxDuration;
private final int avgTuplesInFrame;
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinPOperator.class.getName());
public IntervalPartitionJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
- int memSizeInFrames, int buildTupleCount, int probeTupleCount, int buildMaxDuration, int probeMaxDuration,
- int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
+ int memSizeInFrames, long buildTupleCount, long probeTupleCount, long buildMaxDuration,
+ long probeMaxDuration, int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities, mjcf, rangeMap);
this.memSizeInFrames = memSizeInFrames;
this.buildTupleCount = buildTupleCount;
@@ -62,19 +62,19 @@ public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperato
+ ".");
}
- public int getProbeTupleCount() {
+ public long getProbeTupleCount() {
return probeTupleCount;
}
- public int getProbeMaxDuration() {
+ public long getProbeMaxDuration() {
return probeMaxDuration;
}
- public int getBuildTupleCount() {
+ public long getBuildTupleCount() {
return buildTupleCount;
}
- public int getBuildMaxDuration() {
+ public long getBuildMaxDuration() {
return buildMaxDuration;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index 2772e68..629606c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -370,7 +370,6 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
flags[i] = true;
}
ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags);
- // ReplicatePOperator splitPOperator = new ReplicatePOperator();
IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars,
rangeMap);
splitOperator.setPhysicalOperator(splitPOperator);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index b2f010c..14b7aa6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -27,6 +27,8 @@ import java.util.logging.Logger;
import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
+import org.apache.asterix.common.annotations.JoinIntervalMaxDurationExpressionAnnotation;
+import org.apache.asterix.common.annotations.JoinRecordCountsExpressionAnnotation;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory;
@@ -89,7 +91,7 @@ public class JoinUtils {
} else if (ijea.isPartitionJoin()) {
// Overlapping Interval Partition.
LOGGER.fine("Interval Join - Cluster Parititioning");
- setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), context);
+ setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), ijea, context);
} else if (ijea.isSpatialJoin()) {
// Spatial Partition.
LOGGER.fine("Interval Join - Spatial Partitioning");
@@ -112,6 +114,7 @@ public class JoinUtils {
return null;
}
+
private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap,
IOptimizationContext context) {
@@ -122,12 +125,20 @@ public class JoinUtils {
private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap,
- IOptimizationContext context) {
+ IntervalJoinExpressionAnnotation ijea, IOptimizationContext context) {
+ long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount()
+ : getCardinality(sideLeft, context);
+ long rightCount = ijea.getRightRecordCount() > 0 ? ijea.getRightRecordCount()
+ : getCardinality(sideRight, context);
+ long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? ijea.getLeftMaxDuration()
+ : getMaxDuration(sideLeft, context);
+ long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? ijea.getRightMaxDuration()
+ : getMaxDuration(sideRight, context);
+
IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, rangeMap);
op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
- sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
- getCardinality(sideLeft, context), getCardinality(sideRight, context),
- getMaxDuration(sideLeft, context), getMaxDuration(sideRight, context),
+ sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
+ rightCount, leftMaxDuration, rightMaxDuration,
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), mjcf, rangeMap));
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
index 6222c86..1fe23da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
@@ -26,7 +26,7 @@ use dataverse TinyCollege;
for $f in dataset Staff
for $d in dataset Students
-where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($d.attendance, $f.employment)
+where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($d.attendance, $f.employment)
/*+ range ["F", "L", "R"] */
order by $f.name, $d.name
return { "staff" : $f.name, "student" : $d.name }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
index 337221d..2057130 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
@@ -26,7 +26,7 @@ use dataverse TinyCollege;
for $f in dataset Staff
for $d in dataset Students
-where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($f.employment, $d.attendance)
+where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($f.employment, $d.attendance)
/*+ range ["F", "L", "R"] */
order by $f.name, $d.name
return { "staff" : $f.name, "student" : $d.name }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
index 342b9e8..f2f325d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
@@ -18,10 +18,11 @@
*/
package org.apache.asterix.common.annotations;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
+public class IntervalJoinExpressionAnnotation extends AbstractExpressionAnnotation {
private static final String RAW_HINT_STRING = "interval-raw-join";
private static final String PARTITION_HINT_STRING = "interval-partition-join";
@@ -30,19 +31,13 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
private static final String INDEX_HINT_STRING = "interval-index-join";
public static final IntervalJoinExpressionAnnotation INSTANCE = new IntervalJoinExpressionAnnotation();
- private Object object;
- private IRangeMap map;
- private String joinType;
+ private IRangeMap map = null;
+ private String joinType = null;
+ private long leftMaxDuration = -1;
+ private long rightMaxDuration = -1;
+ private long leftRecordCount = -1;
+ private long rightRecordCount = -1;
- @Override
- public Object getObject() {
- return object;
- }
-
- @Override
- public void setObject(Object object) {
- this.object = object;
- }
@Override
public IExpressionAnnotation copy() {
@@ -51,15 +46,25 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
return clone;
}
- public void setRangeMap(IRangeMap map) {
- this.map = map;
+ @Override
+ public void setObject(Object object) {
+ super.setObject(object);
+ parseHint();
}
- public IRangeMap getRangeMap() {
- return map;
+ private void parseHint() {
+ String[] args = ((String) object).split(" ");
+ setJoinType(args[0]);
+
+ if (joinType.equals(PARTITION_HINT_STRING) && args.length == 6) {
+ leftRecordCount = Long.valueOf(args[2]);
+ rightRecordCount = Long.valueOf(args[3]);
+ leftMaxDuration = Long.valueOf(args[4]);
+ rightMaxDuration = Long.valueOf(args[5]);
+ }
}
- public void setJoinType(String hint) {
+ private void setJoinType(String hint) {
if (hint.startsWith(RAW_HINT_STRING)) {
joinType = RAW_HINT_STRING;
} else if (hint.startsWith(PARTITION_HINT_STRING)) {
@@ -73,6 +78,30 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
}
}
+ public long getLeftMaxDuration() {
+ return leftMaxDuration;
+ }
+
+ public long getRightMaxDuration() {
+ return rightMaxDuration;
+ }
+
+ public long getLeftRecordCount() {
+ return leftRecordCount;
+ }
+
+ public long getRightRecordCount() {
+ return rightRecordCount;
+ }
+
+ public void setRangeMap(IRangeMap map) {
+ this.map = map;
+ }
+
+ public IRangeMap getRangeMap() {
+ return map;
+ }
+
public String getRangeType() {
return joinType;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
index e1dd1cb..de1e1fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
@@ -18,25 +18,14 @@
*/
package org.apache.asterix.common.annotations;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
-public class SkipSecondaryIndexSearchExpressionAnnotation implements IExpressionAnnotation {
+public class SkipSecondaryIndexSearchExpressionAnnotation extends AbstractExpressionAnnotation {
public static final String HINT_STRING = "skip-index";
public static final SkipSecondaryIndexSearchExpressionAnnotation INSTANCE = new SkipSecondaryIndexSearchExpressionAnnotation();
- private Object object;
-
- @Override
- public Object getObject() {
- return object;
- }
-
- @Override
- public void setObject(Object object) {
- this.object = object;
- }
-
@Override
public IExpressionAnnotation copy() {
SkipSecondaryIndexSearchExpressionAnnotation clone = new SkipSecondaryIndexSearchExpressionAnnotation();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 80ddeef..bbba41d 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -2112,10 +2112,10 @@ Expression FunctionCallExpr() throws ParseException:
if (IntervalJoinExpressionAnnotation.isIntervalJoinHint(hint)) {
IntervalJoinExpressionAnnotation ijea = IntervalJoinExpressionAnnotation.INSTANCE;
ijea.setObject(hint);
- ijea.setJoinType(hint);
try {
if (ijea.hasRangeArgument()) {
- ijea.setRangeMap(RangeMapBuilder.parseHint(hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint))));
+ String rangeHint = hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint), hint.indexOf(']', 0) + 1);
+ ijea.setRangeMap(RangeMapBuilder.parseHint(rangeHint));
}
} catch (AsterixException e) {
{if (true) throw new ParseException(e.getMessage());}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
deleted file mode 100644
index 1bccbd2..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
+++ /dev/null
@@ -1,649 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-/**
- * This class mainly applies one level of HHJ on a pair of
- * relations. It is always called by the descriptor.
- */
-public class IntervalPartitionJoin {
-
- // Used for special probe BigObject which can not be held into the Join memory
- private FrameTupleAppender bigProbeFrameAppender;
-
- enum SIDE {
- BUILD,
- PROBE
- }
-
- private IHyracksTaskContext ctx;
-
- private final String buildRelName;
- private final String probeRelName;
-
- private final ITuplePartitionComputer buildHpc;
- private final ITuplePartitionComputer probeHpc;
-
- private final RecordDescriptor buildRd;
- private final RecordDescriptor probeRd;
-
- private RunFileWriter[] buildRFWriters; //writing spilled build partitions
- private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
- private final int memForJoin;
- private final int k;
- private final int numOfPartitions;
- private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
-
- private VPartitionTupleBufferManager buildBufferManager;
- private VPartitionTupleBufferManager probeBufferManager;
-
- private final FrameTupleAccessor accessorBuild;
- private final FrameTupleAccessor accessorProbe;
-
- private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoin.class.getName());
-
- // stats information
- private IntervalPartitionJoinData ipjd;
-
- private IFrame reloadBuffer;
- private TuplePointer tempPtr = new TuplePointer();
-
- private IIntervalMergeJoinChecker imjc;
-
- public IntervalPartitionJoin(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
- String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
- RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
- this.ctx = ctx;
- this.memForJoin = memForJoin;
- this.k = k;
- this.buildRd = buildRd;
- this.probeRd = probeRd;
- this.buildHpc = buildHpc;
- this.probeHpc = probeHpc;
- this.imjc = imjc;
- this.buildRelName = buildRelName;
- this.probeRelName = probeRelName;
-
- this.numOfPartitions = numOfPartitions;
- this.buildRFWriters = new RunFileWriter[numOfPartitions];
- this.probeRFWriters = new RunFileWriter[numOfPartitions];
- this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
-
- this.accessorBuild = new FrameTupleAccessor(buildRd);
- this.accessorProbe = new FrameTupleAccessor(probeRd);
-
- ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
- }
-
- public void initBuild() throws HyracksDataException {
- buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
- memForJoin * ctx.getInitialFrameSize());
- }
-
- private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
- return VPartitionTupleBufferManager.NO_CONSTRAIN;
- }
-
- public void build(ByteBuffer buffer) throws HyracksDataException {
- accessorBuild.reset(buffer);
- int tupleCount = accessorBuild.getTupleCount();
-
- for (int i = 0; i < tupleCount; ++i) {
- int pid = buildHpc.partition(accessorBuild, i, k);
- processTuple(i, pid);
- ipjd.buildIncrementCount(pid);
- }
- }
-
- public void closeBuild() throws HyracksDataException {
- int inMemoryPartitions = 0;
- int totalBuildPartitions = 0;
- flushAndClearBuildSpilledPartition();
-
- // Trying to bring back as many spilled partitions as possible, making them resident
- bringBackSpilledPartitionIfHasMoreMemory();
-
- // Update build partition join map based on partitions with actual data.
- for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
- if (ipjd.buildGetCount(i) == 0) {
- ipjd.buildRemoveFromJoin(i);
- } else if (ipjd.buildGetCount(i) > 0) {
- // Set up build memory for processing joins for partitions in memory.
- createInMemoryJoiner(i);
- inMemoryPartitions++;
- totalBuildPartitions += ipjd.buildGetCount(i);
- }
- }
-
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
- + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
- + ipjd.buildGetSpilledCount());
- }
- }
-
- private void processTuple(int tid, int pid) throws HyracksDataException {
- while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- int victimPartition = selectPartitionToSpill();
- if (victimPartition < 0) {
- throw new HyracksDataException(
- "No more space left in the memory buffer, please give join more memory budgets.");
- }
- spillPartition(victimPartition);
- }
- }
-
- private int selectPartitionToSpill() {
- int partitionToSpill = selectLargestSpilledPartition();
- int maxToSpillPartSize = 0;
- if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
- .getInitialFrameSize()) {
- int partitionInMem = selectNextInMemoryPartitionToSpill();
- if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
- partitionToSpill = partitionInMem;
- }
- }
- return partitionToSpill;
- }
-
- /**
- * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
- *
- * @return
- */
- private int selectNextInMemoryPartitionToSpill() {
- for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
- if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
- return i;
- }
- }
- return -1;
- }
-
- private int selectLargestSpilledPartition() {
- int pid = -1;
- int max = 0;
- for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
- int partSize = buildBufferManager.getPhysicalSize(i);
- if (partSize > max) {
- max = partSize;
- pid = i;
- }
- }
- return pid;
- }
-
- private void spillPartition(int pid) throws HyracksDataException {
- RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
- buildBufferManager.flushPartition(pid, writer);
- buildBufferManager.clearPartition(pid);
- ipjd.buildSpill(pid);
- }
-
- private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
- RunFileWriter[] runFileWriters = null;
- String refName = null;
- switch (whichSide) {
- case BUILD:
- runFileWriters = buildRFWriters;
- refName = buildRelName;
- break;
- case PROBE:
- refName = probeRelName;
- runFileWriters = probeRFWriters;
- break;
- default:
- }
- RunFileWriter writer = runFileWriters[pid];
- if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
- writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- runFileWriters[pid] = writer;
- }
- return writer;
- }
-
- public void clearBuildMemory() throws HyracksDataException {
- for (int pid = 0; pid < numOfPartitions; ++pid) {
- if (buildBufferManager.getNumTuples(pid) > 0) {
- buildBufferManager.clearPartition(pid);
- ipjd.buildRemoveFromJoin(pid);
- }
- }
- }
-
- private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
- for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
- if (buildBufferManager.getNumTuples(pid) > 0) {
- buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
- buildBufferManager.clearPartition(pid);
- buildRFWriters[pid].close();
- }
- }
- }
-
- private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
- for (int pid = 0; pid < numOfPartitions; ++pid) {
- if (probeBufferManager.getNumTuples(pid) > 0) {
- probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
- probeBufferManager.clearPartition(pid);
- probeRFWriters[pid].close();
- }
- }
- }
-
- private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
- // we need number of |spilledPartitions| buffers to store the probe data
- int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize();
- for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
- freeSpace -= buildBufferManager.getPhysicalSize(i);
- }
-
- int pid = 0;
- while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
- if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
- return;
- }
- freeSpace -= buildBufferManager.getPhysicalSize(pid);
- }
- }
-
- private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
- RunFileReader r = wr.createDeleteOnCloseReader();
- r.open();
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
- }
- while (r.nextFrame(reloadBuffer)) {
- accessorBuild.reset(reloadBuffer.getBuffer());
- for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
- if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
- // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
- buildBufferManager.clearPartition(pid);
- r.close();
- return false;
- }
- }
- }
-
- r.close();
- ipjd.buildLoad(pid);
- buildRFWriters[pid] = null;
- return true;
- }
-
- private int selectPartitionsToReload(int freeSpace, int pid) {
- for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
- assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
- if (freeSpace >= buildRFWriters[id].getFileSize()) {
- return id;
- }
- }
- return -1;
- }
-
- private void createInMemoryJoiner(int pid) throws HyracksDataException {
- this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
- buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
- }
-
- private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
- this.inMemJoiner[pid].closeJoin(writer);
- this.inMemJoiner[pid] = null;
- }
-
- public void initProbe() throws HyracksDataException {
- int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions;
- probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
- (probeMemory) * ctx.getInitialFrameSize());
-
- probeRFWriters = new RunFileWriter[numOfPartitions];
- }
-
- public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessorProbe.reset(buffer);
- int tupleCount = accessorProbe.getTupleCount();
-
- for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, k);
- if (!ipjd.hasProbeJoinMap(pid)) {
- // Set probe join map
- ipjd.setProbeJoinMap(pid,
- IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
- }
-
- // Tuple has potential match from build phase
- if (!ipjd.isProbeJoinMapEmpty(pid)) {
- if (ipjd.probeHasSpilled(pid)) {
- // pid is Spilled
- while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
- int victim = pid;
- if (probeBufferManager.getNumTuples(pid) == 0) {
- // current pid is empty, choose the biggest one
- victim = selectLargestSpilledPartition();
- }
- if (victim < 0) {
- // current tuple is too big for all the free space
- flushBigProbeObjectToDisk(pid, accessorProbe, i);
- break;
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
- probeBufferManager.flushPartition(victim, runFileWriter);
- probeBufferManager.clearPartition(victim);
- }
- }
- for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
- // pid has join partitions that are Resident
- int j = pidIterator.next();
- if (inMemJoiner[j] != null) {
- inMemJoiner[j].join(accessorProbe, i, writer);
- }
- }
- }
- ipjd.probeIncrementCount(pid);
- }
- }
-
- public void closeProbe(IFrameWriter writer) throws HyracksDataException {
- // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
- for (int i = 0; i < inMemJoiner.length; ++i) {
- if (inMemJoiner[i] != null) {
- closeInMemoryJoiner(i, writer);
- ipjd.buildLogJoined(i);
- }
- }
- clearBuildMemory();
- flushAndClearProbeSpilledPartition();
- probeBufferManager.close();
- probeBufferManager = null;
- }
-
- private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
- throws HyracksDataException {
- if (bigProbeFrameAppender == null) {
- bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
- }
- RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
- if (!bigProbeFrameAppender.append(accessorProbe, i)) {
- throw new HyracksDataException("The given tuple is too big");
- }
- bigProbeFrameAppender.write(runFileWriter, true);
- }
-
- public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
- return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
- }
-
- public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
- return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
- }
-
- public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
- LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
- if (reloadBuffer == null) {
- reloadBuffer = new VSizeFrame(ctx);
- }
- HashSet<Integer> inMemory = new HashSet<>();
- while (ipjd.buildGetSpilledCount() > 0) {
- // Load back spilled build partitions.
- // TODO only load partition required for spill join. Consider both sides.
- bringBackSpilledPartitionIfHasMoreMemory();
-
- probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
-
- // Create in memory joiners.
- for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
- .buildNextInMemoryWithResults(pid + 1)) {
- createInMemoryJoiner(pid);
- inMemory.add(pid);
- }
-
- // Join all build partitions with disk probe partitions.
- for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
- if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
- RunFileReader pReader = getProbeRFReader(entry.getKey());
- pReader.open();
- while (pReader.nextFrame(reloadBuffer)) {
- accessorProbe.reset(reloadBuffer.getBuffer());
- for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
- // Tuple has potential match from build phase
- for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) {
- // j has join partitions that are Resident
- if (inMemJoiner[j] != null) {
- inMemJoiner[j].join(accessorProbe, i, writer);
- }
- }
- }
- }
- pReader.close();
- }
- }
-
- // Clean up build memory.
- for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
- .buildNextInMemoryWithResults(pid + 1)) {
- closeInMemoryJoiner(pid, writer);
- ipjd.buildLogJoined(pid);
- }
- inMemory.clear();
- clearBuildMemory();
- }
- }
-
- class IntervalPartitionJoinData {
- private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
-
- private int[] buildPSizeInTups;
- private int[] probePSizeInTups;
-
- private BitSet buildJoinedCompleted; //0=waiting, 1=joined
- private BitSet buildSpilledStatus; //0=resident, 1=spilled
- private BitSet buildInMemoryStatus; //0=unknown, 1=resident
- private BitSet probeSpilledStatus; //0=resident, 1=spilled
-
- public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
- probeJoinMap = new LinkedHashMap<>();
-
- buildPSizeInTups = new int[numberOfPartitions];
- probePSizeInTups = new int[numberOfPartitions];
-
- buildJoinedCompleted = new BitSet(numberOfPartitions);
- buildInMemoryStatus = new BitSet(numberOfPartitions);
- buildSpilledStatus = new BitSet(numberOfPartitions);
- probeSpilledStatus = new BitSet(numberOfPartitions);
- }
-
- public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
- return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
- }
-
- public boolean hasProbeJoinMap(int pid) {
- return probeJoinMap.containsKey(pid);
- }
-
- public boolean isProbeJoinMapEmpty(int pid) {
- return probeJoinMap.get(pid).isEmpty();
- }
-
- public Iterator<Integer> getProbeJoinMap(int pid) {
- return probeJoinMap.get(pid).iterator();
- }
-
- public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
- probeJoinMap.put(new Integer(pid), map);
- for (Integer i : map) {
- if (buildIsSpilled(i)) {
- // Build join partition has spilled. Now spill the probe also.
- probeSpilledStatus.set(pid);
- }
- }
- }
-
- public void buildIncrementCount(int pid) {
- buildInMemoryStatus.set(pid);
- buildPSizeInTups[pid]++;
- }
-
- public int buildGetCount(int pid) {
- return buildPSizeInTups[pid];
- }
-
- public void buildLogJoined(int pid) {
- buildSpilledStatus.clear(pid);
- buildJoinedCompleted.set(pid);
- }
-
- public void buildRemoveFromJoin(int pid) {
- buildSpilledStatus.clear(pid);
- buildJoinedCompleted.set(pid);
- }
-
- public boolean buildHasBeenJoined(int pid) {
- return buildJoinedCompleted.get(pid);
- }
-
- public int buildGetSpilledCount() {
- return buildSpilledStatus.cardinality();
- }
-
- public void buildSpill(int pid) {
- buildInMemoryStatus.clear(pid);
- buildSpilledStatus.set(pid);
- }
-
- public void buildLoad(int pid) {
- buildInMemoryStatus.set(pid);
- buildSpilledStatus.clear(pid);
- }
-
- public boolean buildIsSpilled(int pid) {
- return buildSpilledStatus.get(pid);
- }
-
- public int buildNextSpilled(int pid) {
- return buildSpilledStatus.nextSetBit(pid);
- }
-
- public int buildNextInMemoryWithResults(int pid) {
- int nextPid = buildNextInMemory(pid);
- do {
- if (nextPid < 0 || buildGetCount(nextPid) > 0) {
- return nextPid;
- }
- nextPid = buildNextInMemory(nextPid + 1);
- } while (nextPid >= 0);
- return -1;
- }
-
- public int buildNextInMemory(int pid) {
- int nextPid = buildSpilledStatus.nextClearBit(pid);
- if (nextPid >= numOfPartitions) {
- return -1;
- }
- do {
- if (!buildHasBeenJoined(nextPid)) {
- return nextPid;
- }
- nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
- } while (nextPid >= 0 && nextPid < numOfPartitions);
- return -1;
- }
-
- public void probeIncrementCount(int pid) {
- probePSizeInTups[pid]++;
- }
-
- public int probeGetCount(int pid) {
- return probePSizeInTups[pid];
- }
-
- public void probeSpill(int pid) {
- probeSpilledStatus.set(pid);
- }
-
- public boolean probeHasSpilled(int pid) {
- return probeSpilledStatus.get(pid);
- }
-
- public int buildGetMaxPartitionSize() {
- int max = buildPSizeInTups[0];
- for (int i = 1; i < buildPSizeInTups.length; i++) {
- if (buildPSizeInTups[i] > max) {
- max = buildPSizeInTups[i];
- }
- }
- return max;
- }
-
- public int probeGetMaxPartitionSize() {
- int max = probePSizeInTups[0];
- for (int i = 1; i < probePSizeInTups.length; i++) {
- if (probePSizeInTups[i] > max) {
- max = probePSizeInTups[i];
- }
- }
- return max;
- }
-
- }
-
- public void closeAndDeleteRunFiles() throws HyracksDataException {
- for (RunFileWriter rfw : buildRFWriters) {
- if (rfw != null) {
- FileUtils.deleteQuietly(rfw.getFileReference().getFile());
- }
- }
- for (RunFileWriter rfw : probeRFWriters) {
- if (rfw != null) {
- FileUtils.deleteQuietly(rfw.getFileReference().getFile());
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 21e07a5..0dd358c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -57,10 +57,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private final int[] probeKeys;
private final int[] buildKeys;
- private final int probeTupleCount;
- private final int probeMaxDuration;
- private final int buildTupleCount;
- private final int buildMaxDuration;
+ private final long probeTupleCount;
+ private final long probeMaxDuration;
+ private final long buildTupleCount;
+ private final long buildMaxDuration;
private final int avgTuplesPerFrame;
private final int probeKey;
private final int buildKey;
@@ -69,8 +69,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
- public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int leftTupleCount,
- int rightTupleCount, int leftMaxDuration, int rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
+ public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount,
+ long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
IRangeMap rangeMap) {
super(spec, 2, 1);
@@ -108,7 +108,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
}
public static class BuildAndPartitionTaskState extends AbstractStateObject {
- private IntervalPartitionJoin ipj;
+ private IntervalPartitionJoiner ipj;
private int intervalPartitions;
private int partition;
private int k;
@@ -169,7 +169,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
state.memoryForJoin = memsize;
IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition);
- state.ipj = new IntervalPartitionJoin(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
+ state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
state.ipj.initBuild();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
new file mode 100644
index 0000000..5df7b0a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * This class mainly applies one level of HHJ on a pair of
+ * relations. It is always called by the descriptor.
+ */
+public class IntervalPartitionJoiner {
+
+ // Used for special probe BigObject which can not be held into the Join memory
+ private FrameTupleAppender bigProbeFrameAppender;
+
+ enum SIDE {
+ BUILD,
+ PROBE
+ }
+
+ private IHyracksTaskContext ctx;
+
+ private final String buildRelName;
+ private final String probeRelName;
+
+ private final ITuplePartitionComputer buildHpc;
+ private final ITuplePartitionComputer probeHpc;
+
+ private final RecordDescriptor buildRd;
+ private final RecordDescriptor probeRd;
+
+ private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+ private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
+ private final int memForJoin;
+ private final int k;
+ private final int numOfPartitions;
+ private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
+
+ private VPartitionTupleBufferManager buildBufferManager;
+ private VPartitionTupleBufferManager probeBufferManager;
+
+ private final FrameTupleAccessor accessorBuild;
+ private final FrameTupleAccessor accessorProbe;
+
+ private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
+
+ // stats information
+ private IntervalPartitionJoinData ipjd;
+
+ private IFrame reloadBuffer;
+ private TuplePointer tempPtr = new TuplePointer();
+
+ private IIntervalMergeJoinChecker imjc;
+
+ public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
+ String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
+ RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
+ this.ctx = ctx;
+ this.memForJoin = memForJoin;
+ this.k = k;
+ this.buildRd = buildRd;
+ this.probeRd = probeRd;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
+ this.imjc = imjc;
+ this.buildRelName = buildRelName;
+ this.probeRelName = probeRelName;
+
+ this.numOfPartitions = numOfPartitions;
+ this.buildRFWriters = new RunFileWriter[numOfPartitions];
+ this.probeRFWriters = new RunFileWriter[numOfPartitions];
+ this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
+
+ this.accessorBuild = new FrameTupleAccessor(buildRd);
+ this.accessorProbe = new FrameTupleAccessor(probeRd);
+
+ ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
+ }
+
+ public void initBuild() throws HyracksDataException {
+ buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
+ memForJoin * ctx.getInitialFrameSize());
+ }
+
+ private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
+ return VPartitionTupleBufferManager.NO_CONSTRAIN;
+ }
+
+ public void build(ByteBuffer buffer) throws HyracksDataException {
+ accessorBuild.reset(buffer);
+ int tupleCount = accessorBuild.getTupleCount();
+
+ for (int i = 0; i < tupleCount; ++i) {
+ int pid = buildHpc.partition(accessorBuild, i, k);
+ processTuple(i, pid);
+ ipjd.buildIncrementCount(pid);
+ }
+ }
+
+ public void closeBuild() throws HyracksDataException {
+ int inMemoryPartitions = 0;
+ int totalBuildPartitions = 0;
+ flushAndClearBuildSpilledPartition();
+
+ // Trying to bring back as many spilled partitions as possible, making them resident
+ bringBackSpilledPartitionIfHasMoreMemory();
+
+ // Update build partition join map based on partitions with actual data.
+ for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
+ if (ipjd.buildGetCount(i) == 0) {
+ ipjd.buildRemoveFromJoin(i);
+ } else if (ipjd.buildGetCount(i) > 0) {
+ // Set up build memory for processing joins for partitions in memory.
+ createInMemoryJoiner(i);
+ inMemoryPartitions++;
+ totalBuildPartitions += ipjd.buildGetCount(i);
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
+ + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
+ + ipjd.buildGetSpilledCount());
+ }
+ }
+
+ private void processTuple(int tid, int pid) throws HyracksDataException {
+ while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ int victimPartition = selectPartitionToSpill();
+ if (victimPartition < 0) {
+ throw new HyracksDataException(
+ "No more space left in the memory buffer, please give join more memory budgets.");
+ }
+ spillPartition(victimPartition);
+ }
+ }
+
+ private int selectPartitionToSpill() {
+ int partitionToSpill = selectLargestSpilledPartition();
+ int maxToSpillPartSize = 0;
+ if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
+ .getInitialFrameSize()) {
+ int partitionInMem = selectNextInMemoryPartitionToSpill();
+ if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
+ partitionToSpill = partitionInMem;
+ }
+ }
+ return partitionToSpill;
+ }
+
+ /**
+ * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
+ *
+ * @return
+ */
+ private int selectNextInMemoryPartitionToSpill() {
+ for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
+ if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private int selectLargestSpilledPartition() {
+ int pid = -1;
+ int max = 0;
+ for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
+ int partSize = buildBufferManager.getPhysicalSize(i);
+ if (partSize > max) {
+ max = partSize;
+ pid = i;
+ }
+ }
+ return pid;
+ }
+
+ private void spillPartition(int pid) throws HyracksDataException {
+ RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+ buildBufferManager.flushPartition(pid, writer);
+ buildBufferManager.clearPartition(pid);
+ ipjd.buildSpill(pid);
+ }
+
+ private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
+ RunFileWriter[] runFileWriters = null;
+ String refName = null;
+ switch (whichSide) {
+ case BUILD:
+ runFileWriters = buildRFWriters;
+ refName = buildRelName;
+ break;
+ case PROBE:
+ refName = probeRelName;
+ runFileWriters = probeRFWriters;
+ break;
+ default:
+ }
+ RunFileWriter writer = runFileWriters[pid];
+ if (writer == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ runFileWriters[pid] = writer;
+ }
+ return writer;
+ }
+
+ public void clearBuildMemory() throws HyracksDataException {
+ for (int pid = 0; pid < numOfPartitions; ++pid) {
+ if (buildBufferManager.getNumTuples(pid) > 0) {
+ buildBufferManager.clearPartition(pid);
+ ipjd.buildRemoveFromJoin(pid);
+ }
+ }
+ }
+
+ private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
+ for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
+ if (buildBufferManager.getNumTuples(pid) > 0) {
+ buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
+ buildBufferManager.clearPartition(pid);
+ buildRFWriters[pid].close();
+ }
+ }
+ }
+
+ private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
+ for (int pid = 0; pid < numOfPartitions; ++pid) {
+ if (probeBufferManager.getNumTuples(pid) > 0) {
+ probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
+ probeBufferManager.clearPartition(pid);
+ probeRFWriters[pid].close();
+ }
+ }
+ }
+
+ private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
+ // we need number of |spilledPartitions| buffers to store the probe data
+ int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize();
+ for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
+ freeSpace -= buildBufferManager.getPhysicalSize(i);
+ }
+
+ int pid = 0;
+ while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
+ if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
+ return;
+ }
+ freeSpace -= buildBufferManager.getPhysicalSize(pid);
+ }
+ }
+
+ private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
+ RunFileReader r = wr.createDeleteOnCloseReader();
+ r.open();
+ if (reloadBuffer == null) {
+ reloadBuffer = new VSizeFrame(ctx);
+ }
+ while (r.nextFrame(reloadBuffer)) {
+ accessorBuild.reset(reloadBuffer.getBuffer());
+ for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+ if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+ // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
+ buildBufferManager.clearPartition(pid);
+ r.close();
+ return false;
+ }
+ }
+ }
+
+ r.close();
+ ipjd.buildLoad(pid);
+ buildRFWriters[pid] = null;
+ return true;
+ }
+
+ private int selectPartitionsToReload(int freeSpace, int pid) {
+ for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
+ assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
+ if (freeSpace >= buildRFWriters[id].getFileSize()) {
+ return id;
+ }
+ }
+ return -1;
+ }
+
+ private void createInMemoryJoiner(int pid) throws HyracksDataException {
+ this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
+ buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
+ }
+
+ private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
+ this.inMemJoiner[pid].closeJoin(writer);
+ this.inMemJoiner[pid] = null;
+ }
+
+ public void initProbe() throws HyracksDataException {
+ int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions;
+ probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
+ (probeMemory) * ctx.getInitialFrameSize());
+
+ probeRFWriters = new RunFileWriter[numOfPartitions];
+ }
+
+ public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ accessorProbe.reset(buffer);
+ int tupleCount = accessorProbe.getTupleCount();
+
+ for (int i = 0; i < tupleCount; ++i) {
+ int pid = probeHpc.partition(accessorProbe, i, k);
+ if (!ipjd.hasProbeJoinMap(pid)) {
+ // Set probe join map
+ ipjd.setProbeJoinMap(pid,
+ IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
+ }
+
+ // Tuple has potential match from build phase
+ if (!ipjd.isProbeJoinMapEmpty(pid)) {
+ if (ipjd.probeHasSpilled(pid)) {
+ // pid is Spilled
+ while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
+ int victim = pid;
+ if (probeBufferManager.getNumTuples(pid) == 0) {
+ // current pid is empty, choose the biggest one
+ victim = selectLargestSpilledPartition();
+ }
+ if (victim < 0) {
+ // current tuple is too big for all the free space
+ flushBigProbeObjectToDisk(pid, accessorProbe, i);
+ break;
+ }
+ RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+ probeBufferManager.flushPartition(victim, runFileWriter);
+ probeBufferManager.clearPartition(victim);
+ }
+ }
+ for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
+ // pid has join partitions that are Resident
+ int j = pidIterator.next();
+ if (inMemJoiner[j] != null) {
+ inMemJoiner[j].join(accessorProbe, i, writer);
+ }
+ }
+ }
+ ipjd.probeIncrementCount(pid);
+ }
+ }
+
+ public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+ // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ for (int i = 0; i < inMemJoiner.length; ++i) {
+ if (inMemJoiner[i] != null) {
+ closeInMemoryJoiner(i, writer);
+ ipjd.buildLogJoined(i);
+ }
+ }
+ clearBuildMemory();
+ flushAndClearProbeSpilledPartition();
+ probeBufferManager.close();
+ probeBufferManager = null;
+ }
+
+ private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
+ throws HyracksDataException {
+ if (bigProbeFrameAppender == null) {
+ bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ }
+ RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+ if (!bigProbeFrameAppender.append(accessorProbe, i)) {
+ throw new HyracksDataException("The given tuple is too big");
+ }
+ bigProbeFrameAppender.write(runFileWriter, true);
+ }
+
+ public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+ return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
+ }
+
+ public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+ return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
+ }
+
+ public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
+ LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
+ if (reloadBuffer == null) {
+ reloadBuffer = new VSizeFrame(ctx);
+ }
+ HashSet<Integer> inMemory = new HashSet<>();
+ while (ipjd.buildGetSpilledCount() > 0) {
+ // Load back spilled build partitions.
+ // TODO only load partition required for spill join. Consider both sides.
+ bringBackSpilledPartitionIfHasMoreMemory();
+
+ probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
+
+ // Create in memory joiners.
+ for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
+ .buildNextInMemoryWithResults(pid + 1)) {
+ createInMemoryJoiner(pid);
+ inMemory.add(pid);
+ }
+
+ // Join all build partitions with disk probe partitions.
+ for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
+ if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
+ RunFileReader pReader = getProbeRFReader(entry.getKey());
+ pReader.open();
+ while (pReader.nextFrame(reloadBuffer)) {
+ accessorProbe.reset(reloadBuffer.getBuffer());
+ for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
+ // Tuple has potential match from build phase
+ for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) {
+ // j has join partitions that are Resident
+ if (inMemJoiner[j] != null) {
+ inMemJoiner[j].join(accessorProbe, i, writer);
+ }
+ }
+ }
+ }
+ pReader.close();
+ }
+ }
+
+ // Clean up build memory.
+ for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
+ .buildNextInMemoryWithResults(pid + 1)) {
+ closeInMemoryJoiner(pid, writer);
+ ipjd.buildLogJoined(pid);
+ }
+ inMemory.clear();
+ clearBuildMemory();
+ }
+ }
+
+ class IntervalPartitionJoinData {
+ private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
+
+ private int[] buildPSizeInTups;
+ private int[] probePSizeInTups;
+
+ private BitSet buildJoinedCompleted; //0=waiting, 1=joined
+ private BitSet buildSpilledStatus; //0=resident, 1=spilled
+ private BitSet buildInMemoryStatus; //0=unknown, 1=resident
+ private BitSet probeSpilledStatus; //0=resident, 1=spilled
+
+ public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
+ probeJoinMap = new LinkedHashMap<>();
+
+ buildPSizeInTups = new int[numberOfPartitions];
+ probePSizeInTups = new int[numberOfPartitions];
+
+ buildJoinedCompleted = new BitSet(numberOfPartitions);
+ buildInMemoryStatus = new BitSet(numberOfPartitions);
+ buildSpilledStatus = new BitSet(numberOfPartitions);
+ probeSpilledStatus = new BitSet(numberOfPartitions);
+ }
+
+ public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
+ return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
+ }
+
+ public boolean hasProbeJoinMap(int pid) {
+ return probeJoinMap.containsKey(pid);
+ }
+
+ public boolean isProbeJoinMapEmpty(int pid) {
+ return probeJoinMap.get(pid).isEmpty();
+ }
+
+ public Iterator<Integer> getProbeJoinMap(int pid) {
+ return probeJoinMap.get(pid).iterator();
+ }
+
+ public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
+ probeJoinMap.put(new Integer(pid), map);
+ for (Integer i : map) {
+ if (buildIsSpilled(i)) {
+ // Build join partition has spilled. Now spill the probe also.
+ probeSpilledStatus.set(pid);
+ }
+ }
+ }
+
+ public void buildIncrementCount(int pid) {
+ buildInMemoryStatus.set(pid);
+ buildPSizeInTups[pid]++;
+ }
+
+ public int buildGetCount(int pid) {
+ return buildPSizeInTups[pid];
+ }
+
+ public void buildLogJoined(int pid) {
+ buildSpilledStatus.clear(pid);
+ buildJoinedCompleted.set(pid);
+ }
+
+ public void buildRemoveFromJoin(int pid) {
+ buildSpilledStatus.clear(pid);
+ buildJoinedCompleted.set(pid);
+ }
+
+ public boolean buildHasBeenJoined(int pid) {
+ return buildJoinedCompleted.get(pid);
+ }
+
+ public int buildGetSpilledCount() {
+ return buildSpilledStatus.cardinality();
+ }
+
+ public void buildSpill(int pid) {
+ buildInMemoryStatus.clear(pid);
+ buildSpilledStatus.set(pid);
+ }
+
+ public void buildLoad(int pid) {
+ buildInMemoryStatus.set(pid);
+ buildSpilledStatus.clear(pid);
+ }
+
+ public boolean buildIsSpilled(int pid) {
+ return buildSpilledStatus.get(pid);
+ }
+
+ public int buildNextSpilled(int pid) {
+ return buildSpilledStatus.nextSetBit(pid);
+ }
+
+ public int buildNextInMemoryWithResults(int pid) {
+ int nextPid = buildNextInMemory(pid);
+ do {
+ if (nextPid < 0 || buildGetCount(nextPid) > 0) {
+ return nextPid;
+ }
+ nextPid = buildNextInMemory(nextPid + 1);
+ } while (nextPid >= 0);
+ return -1;
+ }
+
+ public int buildNextInMemory(int pid) {
+ int nextPid = buildSpilledStatus.nextClearBit(pid);
+ if (nextPid >= numOfPartitions) {
+ return -1;
+ }
+ do {
+ if (!buildHasBeenJoined(nextPid)) {
+ return nextPid;
+ }
+ nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
+ } while (nextPid >= 0 && nextPid < numOfPartitions);
+ return -1;
+ }
+
+ public void probeIncrementCount(int pid) {
+ probePSizeInTups[pid]++;
+ }
+
+ public int probeGetCount(int pid) {
+ return probePSizeInTups[pid];
+ }
+
+ public void probeSpill(int pid) {
+ probeSpilledStatus.set(pid);
+ }
+
+ public boolean probeHasSpilled(int pid) {
+ return probeSpilledStatus.get(pid);
+ }
+
+ public int buildGetMaxPartitionSize() {
+ int max = buildPSizeInTups[0];
+ for (int i = 1; i < buildPSizeInTups.length; i++) {
+ if (buildPSizeInTups[i] > max) {
+ max = buildPSizeInTups[i];
+ }
+ }
+ return max;
+ }
+
+ public int probeGetMaxPartitionSize() {
+ int max = probePSizeInTups[0];
+ for (int i = 1; i < probePSizeInTups.length; i++) {
+ if (probePSizeInTups[i] > max) {
+ max = probePSizeInTups[i];
+ }
+ }
+ return max;
+ }
+
+ }
+
+ public void closeAndDeleteRunFiles() throws HyracksDataException {
+ for (RunFileWriter rfw : buildRFWriters) {
+ if (rfw != null) {
+ FileUtils.deleteQuietly(rfw.getFileReference().getFile());
+ }
+ }
+ for (RunFileWriter rfw : probeRFWriters) {
+ if (rfw != null) {
+ FileUtils.deleteQuietly(rfw.getFileReference().getFile());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index e05c06e..c6e95e1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -37,15 +37,15 @@ public class IntervalPartitionUtil {
private IntervalPartitionUtil() {
}
- public static int determineK(int countR, int maxDurationR, int countS, int maxDurationS, int avgTuplePerFrame) {
+ public static int determineK(long countR, long maxDurationR, long countS, long maxDurationS, int avgTuplePerFrame) {
double deltaR = 1.0 / maxDurationR;
double deltaS = 1.0 / maxDurationS;
- int knMinusTwo = 0;
- int knMinusOne = 0;
- int kn = 1;
+ long knMinusTwo = 0;
+ long knMinusOne = 0;
+ long kn = 1;
- int prn = determinePn(kn, countR, deltaR);
+ long prn = determinePn(kn, countR, deltaR);
double tn = determineTn(kn, determinePn(kn, countS, deltaS));
while ((kn != knMinusOne) && (kn != knMinusTwo)) {
@@ -55,21 +55,25 @@ public class IntervalPartitionUtil {
prn = determinePn(kn, countR, deltaR);
tn = determineTn(kn, determinePn(kn, countS, deltaS));
}
- return kn;
+ if (kn > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ return (int) kn;
+ }
}
- public static int determineKn(int countR, int countS, int avgTuplePerFrame, int prn, double tn) {
+ public static long determineKn(long countR, long countS, int avgTuplePerFrame, long prn, double tn) {
double factorS = (3.0 * countS) / (2 * (C_IO + 2 * C_CPU) * tn);
double factorR = (C_IO / avgTuplePerFrame) + ((4.0 * countR * C_CPU) / prn);
- return (int) Math.cbrt(factorS * factorR);
+ return (long) Math.cbrt(factorS * factorR);
}
- public static int determinePn(int kn, int count, double delta) {
+ public static long determinePn(long kn, long count, double delta) {
long knDelta = (long) Math.ceil(kn * delta);
- return Math.min((int) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count);
+ return Math.min((long) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count);
}
- public static double determineTn(int kn, int pn) {
+ public static double determineTn(long kn, long pn) {
return pn / ((kn * kn + kn) / 2.0);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
new file mode 100644
index 0000000..de02572
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.expressions;
+
+public abstract class AbstractExpressionAnnotation implements IExpressionAnnotation {
+
+ protected Object object;
+
+ @Override
+ public Object getObject() {
+ return object;
+ }
+
+ @Override
+ public void setObject(Object object) {
+ this.object = object;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
index 140dfb1..3aa34c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
@@ -18,23 +18,11 @@
*/
package org.apache.hyracks.algebricks.core.algebra.expressions;
-public class ExpressionAnnotationNoCopyImpl implements IExpressionAnnotation {
-
- private Object object;
+public class ExpressionAnnotationNoCopyImpl extends AbstractExpressionAnnotation {
@Override
public IExpressionAnnotation copy() {
return this;
}
- @Override
- public Object getObject() {
- return object;
- }
-
- @Override
- public void setObject(Object object) {
- this.object = object;
- }
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
index 5ee6b07..91c0a8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
@@ -18,23 +18,11 @@
*/
package org.apache.hyracks.algebricks.core.algebra.expressions;
-public class IndexedNLJoinExpressionAnnotation implements IExpressionAnnotation {
+public class IndexedNLJoinExpressionAnnotation extends AbstractExpressionAnnotation {
public static final String HINT_STRING = "indexnl";
public static final IndexedNLJoinExpressionAnnotation INSTANCE = new IndexedNLJoinExpressionAnnotation();
- private Object object;
-
- @Override
- public Object getObject() {
- return object;
- }
-
- @Override
- public void setObject(Object object) {
- this.object = object;
- }
-
@Override
public IExpressionAnnotation copy() {
IndexedNLJoinExpressionAnnotation clone = new IndexedNLJoinExpressionAnnotation();