You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:13 UTC
[23/50] [abbrv] asterixdb git commit: Finalize the range in context.
Finalize the range in context.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/13af53a7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/13af53a7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/13af53a7
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 13af53a7b26c421e58848f6614b2cd0534cd72f4
Parents: 2061a38
Author: Preston Carman <pr...@apache.org>
Authored: Mon Aug 15 14:30:00 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Mon Aug 15 14:30:00 2016 -0700
----------------------------------------------------------------------
.../physical/AbstractIntervalJoinPOperator.java | 19 +++++++++++----
.../IntervalLocalRangeOperatorDescriptor.java | 2 +-
.../rules/IntervalSplitPartitioningRule.java | 8 ++++---
.../asterix/optimizer/rules/util/JoinUtils.java | 25 +++++++++++---------
.../AfterIntervalMergeJoinCheckerFactory.java | 4 ++--
.../BeforeIntervalMergeJoinCheckerFactory.java | 4 ++--
...overedByIntervalMergeJoinCheckerFactory.java | 4 ++--
.../CoversIntervalMergeJoinCheckerFactory.java | 4 ++--
.../EndedByIntervalMergeJoinCheckerFactory.java | 4 ++--
.../EndsIntervalMergeJoinCheckerFactory.java | 4 ++--
.../joins/IIntervalMergeJoinCheckerFactory.java | 4 ++--
.../MeetsIntervalMergeJoinCheckerFactory.java | 4 ++--
.../MetByIntervalMergeJoinCheckerFactory.java | 4 ++--
...lappedByIntervalMergeJoinCheckerFactory.java | 4 ++--
...rlappingIntervalMergeJoinCheckerFactory.java | 15 ++++++++++--
...OverlapsIntervalMergeJoinCheckerFactory.java | 4 ++--
...tartedByIntervalMergeJoinCheckerFactory.java | 4 ++--
.../StartsIntervalMergeJoinCheckerFactory.java | 4 ++--
.../intervalindex/IntervalIndexJoiner.java | 2 +-
...IntervalPartitionJoinOperatorDescriptor.java | 16 +++++++------
.../operators/physical/MergeJoinPOperator.java | 15 +++++++++---
.../properties/OrderedPartitionedProperty.java | 6 +++++
.../algebra/util/OperatorPropertiesUtil.java | 15 +++++++-----
.../rules/EnforceStructuralPropertiesRule.java | 4 +---
.../hyracks/dataflow/std/base/RangeId.java | 25 ++++++++++++++++++--
.../connectors/PartitionRangeDataWriter.java | 2 +-
.../std/join/IMergeJoinCheckerFactory.java | 4 ++--
.../std/join/MergeJoinOperatorDescriptor.java | 2 +-
.../join/NaturalMergeJoinCheckerFactory.java | 4 ++--
.../misc/RangeForwardOperatorDescriptor.java | 3 ++-
.../std/sort/AbstractExternalSortRunMerger.java | 24 +++++++++----------
31 files changed, 155 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
index c400cdf..3be9e80 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
@@ -84,10 +84,18 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
return mjcf;
}
- public RangeId getRangeId() {
+ public RangeId getLeftRangeId() {
return leftRangeId;
}
+ public RangeId getRightRangeId() {
+ return rightRangeId;
+ }
+
+ public IRangeMap getRangeMapHint() {
+ return rangeMapHint;
+ }
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.EXTENSION_OPERATOR;
@@ -111,7 +119,8 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
for (LogicalVariable v : keysLeftBranch) {
order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
}
- IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT, rangeMapHint);
+ IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId,
+ RangePartitioningType.PROJECT, rangeMapHint);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
propsLocal.add(new LocalOrderProperty(order));
deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -141,8 +150,10 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
ispRight.add(new LocalOrderProperty(orderRight));
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(), rangeMapHint);
- ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(), rangeMapHint);
+ ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(),
+ rangeMapHint);
+ ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(),
+ rangeMapHint);
}
pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index 584e30f..f24dc7c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -154,7 +154,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
writers[i].open();
resultAppender[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
}
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
IRangeMap rangeMap = rangeState.getRangeMap();
nodeRangeStart = getPartitionBoundryStart(rangeMap);
nodeRangeEnd = getPartitionBoundryEnd(rangeMap);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index eafc2bb..f0ff610 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -418,14 +418,15 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo;
MergeJoinPOperator mjpoClone = new MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(),
mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), memoryJoinSize,
- mjpo.getMergeJoinCheckerFactory(), mjpo.getRangeId(), null);
+ mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), mjpo.getRightRangeId(), null);
ijoClone.setPhysicalOperator(mjpoClone);
} else if (joinPo.getOperatorTag() == PhysicalOperatorTag.EXTENSION_OPERATOR) {
if (joinPo instanceof IntervalIndexJoinPOperator) {
IntervalIndexJoinPOperator iijpo = (IntervalIndexJoinPOperator) joinPo;
IntervalIndexJoinPOperator iijpoClone = new IntervalIndexJoinPOperator(iijpo.getKind(),
iijpo.getPartitioningType(), iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(),
- memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getRangeId(), null);
+ memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getLeftRangeId(),
+ iijpo.getRightRangeId(), null);
ijoClone.setPhysicalOperator(iijpoClone);
} else if (joinPo instanceof IntervalPartitionJoinPOperator) {
IntervalPartitionJoinPOperator ipjpo = (IntervalPartitionJoinPOperator) joinPo;
@@ -433,7 +434,8 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
ipjpo.getPartitioningType(), ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(),
memoryJoinSize, ipjpo.getBuildTupleCount(), ipjpo.getProbeTupleCount(),
ipjpo.getBuildMaxDuration(), ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(),
- ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getRangeId(), null);
+ ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getLeftRangeId(), ipjpo.getRightRangeId(),
+ null);
ijoClone.setPhysicalOperator(iijpoClone);
} else {
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index 2707403..795ee82 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -55,7 +55,6 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
@@ -142,9 +141,10 @@ public class JoinUtils {
private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
IOptimizationContext context) {
- IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi);
+ RangeId leftRangeId = context.newRangeId();
+ IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, sideLeft,
- sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, context.newRangeId(),
+ sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
context.newRangeId(), ijea.getRangeMap()));
}
@@ -161,20 +161,22 @@ public class JoinUtils {
int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? ijea.getTuplesPerFrame()
: context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame();
- IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi);
+ RangeId leftRangeId = context.newRangeId();
+ IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
- rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, mjcf, context.newRangeId(),
- context.newRangeId(), ijea.getRangeMap()));
+ rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, mjcf, leftRangeId, context.newRangeId(),
+ ijea.getRangeMap()));
}
private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
IOptimizationContext context) {
- IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi);
+ RangeId leftRangeId = context.newRangeId();
+ IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
- sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf,
- context.newRangeId(), context.newRangeId(), ijea.getRangeMap()));
+ sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
+ context.newRangeId(), ijea.getRangeMap()));
}
private static int getMaxDuration(List<LogicalVariable> lv, IOptimizationContext context) {
@@ -230,8 +232,9 @@ public class JoinUtils {
}
}
- private static IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi) {
- IIntervalMergeJoinCheckerFactory mjcf = new OverlappingIntervalMergeJoinCheckerFactory();
+ private static IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi,
+ RangeId rangeId) {
+ IIntervalMergeJoinCheckerFactory mjcf = new OverlappingIntervalMergeJoinCheckerFactory(rangeId);
if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY)) {
mjcf = new OverlappedByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
index 572241c..09b3020 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class AfterIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new AfterIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
index ff5acf2..3e913d2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class BeforeIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new BeforeIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
index 64b0c2a..a513cbc 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class CoveredByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new CoveredByIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
index dc50451..cc9b37d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class CoversIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new CoversIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
index 68e2922..c3a681c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class EndedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new EndedByIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
index e5b7be0..295bd04 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class EndsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new EndsIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
index e4ceeb1..f2e3d80 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
@@ -20,14 +20,14 @@ package org.apache.asterix.runtime.operators.joins;
import java.io.Serializable;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
public interface IIntervalMergeJoinCheckerFactory extends IMergeJoinCheckerFactory, Serializable {
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap)
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx)
throws HyracksDataException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
index 038f9ef..c970fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class MeetsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new MeetsIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
index 6c3fe32..fad2d88 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class MetByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new MetByIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
index 8031181..c47381a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class OverlappedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new OverlappedByIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
index 195a85f..a5f7770 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
@@ -19,18 +19,29 @@
package org.apache.asterix.runtime.operators.joins;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
public class OverlappingIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
+ private final RangeId rangeId;
+
+ public OverlappingIntervalMergeJoinCheckerFactory(RangeId rangeId) {
+ this.rangeId = rangeId;
+ }
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap)
- throws HyracksDataException {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition,
+ IHyracksTaskContext ctx) throws HyracksDataException {
int fieldIndex = 0;
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
+ .getStateObject(new RangeId(rangeId.getId(), ctx));
+ IRangeMap rangeMap = rangeState.getRangeMap();
if (ATypeTag.INT64.serialize() != rangeMap.getTag(0, 0)) {
throw new HyracksDataException("Invalid range map type for interval merge join checker.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
index e3ecf2e..0cf3ac1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class OverlapsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new OverlapsIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
index 431aa8e..0938fe2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class StartedByIntervalMergeJoinCheckerFactory extends AbstractIntervalInverseMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new StartedByIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
index a05615c..924b442 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.runtime.operators.joins;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public class StartsIntervalMergeJoinCheckerFactory extends AbstractIntervalMergeJoinCheckerFactory {
private static final long serialVersionUID = 1L;
@Override
- public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
return new StartsIntervalMergeJoinChecker(keys0, keys1);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 9ca536b..6f04cad 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
super(ctx, partition, status, locks, leftRd, rightRd);
this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT;
- this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, null);
+ this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, ctx);
this.leftKey = leftKeys[0];
this.rightKey = rightKeys[0];
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 8c4c43d..4e1850c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -160,18 +160,20 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
}
}
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
- long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition);
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
+ .getStateObject(new RangeId(rangeId.getId(), ctx));
+ long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
+ partition);
long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
- ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, k, partitionStart,
- partitionEnd).createPartitioner();
- ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, k, partitionStart,
- partitionEnd).createPartitioner();
+ ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, state.k,
+ partitionStart, partitionEnd).createPartitioner();
+ ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, state.k,
+ partitionStart, partitionEnd).createPartitioner();
state.partition = partition;
state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
state.memoryForJoin = memsize;
- IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, rangeState.getRangeMap());
+ IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx);
state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
index c24f3c8..7fa7fdf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
@@ -93,10 +93,18 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
return mjcf;
}
- public RangeId getRangeId() {
+ public RangeId getLeftRangeId() {
return leftRangeId;
}
+ public RangeId getRightRangeId() {
+ return rightRangeId;
+ }
+
+ public IRangeMap getRangeMapHint() {
+ return rangeMapHint;
+ }
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.MERGE_JOIN;
@@ -113,8 +121,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
for (LogicalVariable v : keysLeftBranch) {
order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
}
- IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT,
- rangeMapHint);
+ IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId,
+ RangePartitioningType.PROJECT, rangeMapHint);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
propsLocal.add(new LocalOrderProperty(order));
deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -176,4 +184,5 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
ILogicalOperator src2 = op.getInputs().get(1).getValue();
builder.contributeGraphEdge(src2, 0, op, 1);
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 040e663..92d4098 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -35,6 +35,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
private INodeDomain domain;
private RangeId rangeId;
private RangePartitioningType rangeType;
+ private IRangeMap rangeMapHint;
public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId,
RangePartitioningType rangeType, IRangeMap rangeMapHint) {
@@ -42,6 +43,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
this.orderColumns = orderColumns;
this.rangeId = rangeId;
this.rangeType = rangeType;
+ this.rangeMapHint = rangeMapHint;
}
public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId) {
@@ -93,6 +95,10 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
return rangeId;
}
+ public IRangeMap getRangeMapHint() {
+ return rangeMapHint;
+ }
+
@Override
public INodeDomain getNodeDomain() {
return domain;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 353a782..fd340d2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -45,6 +45,9 @@ public class OperatorPropertiesUtil {
private static final String MOVABLE = "isMovable";
+ private OperatorPropertiesUtil() {
+ }
+
public static <T> boolean disjoint(Collection<T> c1, Collection<T> c2) {
for (T m : c1) {
if (c2.contains(m)) {
@@ -58,7 +61,7 @@ public class OperatorPropertiesUtil {
private static void getFreeVariablesInOp(ILogicalOperator op, Set<LogicalVariable> freeVars)
throws AlgebricksException {
VariableUtilities.getUsedVariables(op, freeVars);
- HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> produced = new HashSet<>();
VariableUtilities.getProducedVariables(op, produced);
for (LogicalVariable v : produced) {
freeVars.remove(v);
@@ -75,13 +78,13 @@ public class OperatorPropertiesUtil {
*/
public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set<LogicalVariable> freeVars)
throws AlgebricksException {
- HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> produced = new HashSet<>();
VariableUtilities.getProducedVariables(op, produced);
for (LogicalVariable v : produced) {
freeVars.remove(v);
}
- HashSet<LogicalVariable> used = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> used = new HashSet<>();
VariableUtilities.getUsedVariables(op, used);
for (LogicalVariable v : used) {
freeVars.add(v);
@@ -108,7 +111,7 @@ public class OperatorPropertiesUtil {
*/
public static void getFreeVariablesInPath(ILogicalOperator op, ILogicalOperator dest, Set<LogicalVariable> freeVars)
throws AlgebricksException {
- Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+ Set<LogicalVariable> producedVars = new ListSet<>();
VariableUtilities.getLiveVariables(op, freeVars);
collectUsedAndProducedVariablesInPath(op, dest, freeVars, producedVars);
freeVars.removeAll(producedVars);
@@ -163,13 +166,13 @@ public class OperatorPropertiesUtil {
}
public static boolean hasFreeVariablesInSelfOrDesc(AbstractLogicalOperator op) throws AlgebricksException {
- HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> free = new HashSet<>();
getFreeVariablesInSelfOrDesc(op, free);
return !free.isEmpty();
}
public static boolean hasFreeVariables(ILogicalOperator op) throws AlgebricksException {
- HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> free = new HashSet<>();
getFreeVariablesInOp(op, free);
return !free.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 4ec5e27..f3adffd 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -586,9 +586,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
// Add RangeForwardOperator.
- IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
- .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
- addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), rangeMap, context);
+ addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), opp.getRangeMapHint(), context);
boolean propWasSet = false;
pop = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
index befaad9..774dd2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -20,12 +20,25 @@ package org.apache.hyracks.dataflow.std.base;
import java.io.Serializable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
/**
* Represents a range id in a logical plan.
*/
public final class RangeId implements Serializable {
private static final long serialVersionUID = 1L;
private final int id;
+ private int partition = -1;
+
+ public RangeId(int id, int partition) {
+ this.id = id;
+ this.partition = partition;
+ }
+
+ public RangeId(int id, IHyracksTaskContext ctx) {
+ this.id = id;
+ this.partition = ctx.getTaskAttemptId().getTaskId().getPartition();
+ }
public RangeId(int id) {
this.id = id;
@@ -35,9 +48,17 @@ public final class RangeId implements Serializable {
return id;
}
+ public int getPartition() {
+ return partition;
+ }
+
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
@Override
public String toString() {
- return "RangeId(#" + id + ")";
+ return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + ")";
}
@Override
@@ -45,7 +66,7 @@ public final class RangeId implements Serializable {
if (!(obj instanceof RangeId)) {
return false;
} else {
- return id == ((RangeId) obj).getId();
+ return id == ((RangeId) obj).getId() && partition == ((RangeId) obj).getPartition();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index 2740a60..c08035a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends AbstractPartitionDataWriter {
@Override
public void open() throws HyracksDataException {
super.open();
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
tpc = trpcf.createPartitioner(rangeState.getRangeMap());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
index 850bf56..d7ac550 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
@@ -20,13 +20,13 @@ package org.apache.hyracks.dataflow.std.join;
import java.io.Serializable;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IMergeJoinCheckerFactory extends Serializable {
- IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) throws HyracksDataException;
+ IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) throws HyracksDataException;
RangePartitioningType getLeftPartitioningType();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 6f0b33b..5624bb5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
locks.setPartitions(nPartitions);
RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
- partition, null);
+ partition, ctx);
return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
index 15df580..abdadb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
@@ -18,9 +18,9 @@
*/
package org.apache.hyracks.dataflow.std.join;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -33,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory
}
@Override
- public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
+ public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IHyracksTaskContext ctx) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 067246d..04cfca3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -90,7 +90,8 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
@Override
public void open() throws HyracksDataException {
- state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap);
+ state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(),
+ new RangeId(rangeId.getId(), ctx), rangeMap);
ctx.setStateObject(state);
writer.open();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index e032e6a..6d9d085 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -49,7 +49,6 @@ public abstract class AbstractExternalSortRunMerger {
private final INormalizedKeyComputer nmkComputer;
private final RecordDescriptor recordDesc;
private final int framesLimit;
- private final int MAX_FRAME_SIZE;
private final int topK;
private List<GroupVSizeFrame> inFrames;
private VSizeFrame outputFrame;
@@ -75,14 +74,13 @@ public abstract class AbstractExternalSortRunMerger {
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
- this.MAX_FRAME_SIZE = FrameConstants.MAX_FRAMESIZE;
this.topK = topK;
}
public void process() throws HyracksDataException {
IFrameWriter finalWriter = null;
try {
- if (runs.size() <= 0) {
+ if (runs.isEmpty()) {
finalWriter = prepareSkipMergingFinalResultWriter(writer);
finalWriter.open();
if (sorter != null) {
@@ -169,9 +167,10 @@ public abstract class AbstractExternalSortRunMerger {
}
}
- private static int selectPartialRuns(int budget, List<GeneratedRunFileReader> runs,
+ private static int selectPartialRuns(int argBudget, List<GeneratedRunFileReader> runs,
List<GeneratedRunFileReader> partialRuns, BitSet runAvailable, int stop) {
partialRuns.clear();
+ int budget = argBudget;
int maxFrameSizeOfGenRun = 0;
int nextRunId = runAvailable.nextSetBit(0);
while (budget > 0 && nextRunId >= 0 && nextRunId < stop) {
@@ -192,13 +191,14 @@ public abstract class AbstractExternalSortRunMerger {
if (extraFreeMem > 0 && partialRuns.size() > 1) {
int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
int avg = (extraFrames / partialRuns.size()) * ctx.getInitialFrameSize();
- int residue = (extraFrames % partialRuns.size());
+ int residue = extraFrames % partialRuns.size();
for (int i = 0; i < residue; i++) {
- partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE,
+ partialRuns.get(i).updateSize(Math.min(FrameConstants.MAX_FRAMESIZE,
partialRuns.get(i).getMaxFrameSize() + avg + ctx.getInitialFrameSize()));
}
for (int i = residue; i < partialRuns.size() && avg > 0; i++) {
- partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, partialRuns.get(i).getMaxFrameSize() + avg));
+ partialRuns.get(i)
+ .updateSize(Math.min(FrameConstants.MAX_FRAMESIZE, partialRuns.get(i).getMaxFrameSize() + avg));
}
}
@@ -214,17 +214,17 @@ public abstract class AbstractExternalSortRunMerger {
}
}
- abstract protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+ protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
throws HyracksDataException;
- abstract protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
+ protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
- abstract protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+ protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
throws HyracksDataException;
- abstract protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
+ protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
- abstract protected int[] getSortFields();
+ protected abstract int[] getSortFields();
private void merge(IFrameWriter writer, List<GeneratedRunFileReader> partialRuns) throws HyracksDataException {
RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(),