You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:15 UTC
[25/50] [abbrv] asterixdb git commit: Working version of the range
connector and interval join partition.
Working version of the range connector and interval join partition.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1487f2be
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1487f2be
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1487f2be
Branch: refs/heads/ecarm002/interval_join_merge
Commit: 1487f2be3a364e32e3fee7a10246b6f548f75476
Parents: 13af53a
Author: Preston Carman <pr...@apache.org>
Authored: Wed Aug 24 14:35:57 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Wed Aug 24 14:35:57 2016 -0700
----------------------------------------------------------------------
.../physical/AbstractIntervalJoinPOperator.java | 56 +++++---
.../IntervalLocalRangeOperatorDescriptor.java | 2 +-
.../IntervalPartitionJoinPOperator.java | 86 +++++++-----
.../asterix/optimizer/base/RuleCollections.java | 2 +-
.../rules/IntervalSplitPartitioningRule.java | 61 ++++----
.../asterix/optimizer/rules/util/JoinUtils.java | 128 ++++++++++++++---
.../translator/util/FunctionCollection.java | 6 +
.../config/AsterixPropertiesAccessor.java | 6 +-
.../om/functions/AsterixBuiltinFunctions.java | 6 +
.../evaluators/common/FunctionManagerImpl.java | 6 +-
.../CalendarDurationFromDateTimeDescriptor.java | 6 +-
.../IntervalPartitionJoinEndDescriptor.java | 58 ++++++++
.../temporal/IntervalPartitionJoinFunction.java | 138 +++++++++++++++++++
.../IntervalPartitionJoinStartDescriptor.java | 58 ++++++++
...rlappingIntervalMergeJoinCheckerFactory.java | 3 +-
.../IntervalPartitionComputerFactory.java | 24 +---
...IntervalPartitionJoinOperatorDescriptor.java | 28 +---
.../IntervalPartitionUtil.java | 22 +++
.../algebra/functions/FunctionIdentifier.java | 3 +-
.../logical/visitors/SchemaVariableVisitor.java | 1 +
.../rules/EnforceStructuralPropertiesRule.java | 3 +-
.../rewriter/rules/IntroduceProjectsRule.java | 18 +--
.../SetAlgebricksPhysicalOperatorsRule.java | 7 +
.../hyracks/dataflow/std/base/RangeId.java | 3 +-
.../connectors/PartitionRangeDataWriter.java | 2 +-
.../misc/RangeForwardOperatorDescriptor.java | 10 ++
26 files changed, 573 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
index 3be9e80..cc2a022 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
@@ -55,7 +55,7 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
private final List<LogicalVariable> keysLeftBranch;
private final List<LogicalVariable> keysRightBranch;
- private final IIntervalMergeJoinCheckerFactory mjcf;
+ protected final IIntervalMergeJoinCheckerFactory mjcf;
private final RangeId leftRangeId;
private final RangeId rightRangeId;
private final IRangeMap rangeMapHint;
@@ -115,14 +115,11 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
@Override
public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
- ArrayList<OrderColumn> order = new ArrayList<>();
- for (LogicalVariable v : keysLeftBranch) {
- order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
- }
+ ArrayList<OrderColumn> order = getLeftRangeOrderColumn();
IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId,
RangePartitioningType.PROJECT, rangeMapHint);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
- propsLocal.add(new LocalOrderProperty(order));
+ propsLocal.add(new LocalOrderProperty(getLeftLocalSortOrderColumn()));
deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
}
@@ -134,26 +131,17 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
IPartitioningProperty ppLeft = null;
List<ILocalStructuralProperty> ispLeft = new ArrayList<>();
+ ispLeft.add(new LocalOrderProperty(getLeftLocalSortOrderColumn()));
+
IPartitioningProperty ppRight = null;
List<ILocalStructuralProperty> ispRight = new ArrayList<>();
-
- ArrayList<OrderColumn> orderLeft = new ArrayList<>();
- for (LogicalVariable v : keysLeftBranch) {
- orderLeft.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
- }
- ispLeft.add(new LocalOrderProperty(orderLeft));
-
- ArrayList<OrderColumn> orderRight = new ArrayList<>();
- for (LogicalVariable v : keysRightBranch) {
- orderRight.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
- }
- ispRight.add(new LocalOrderProperty(orderRight));
+ ispRight.add(new LocalOrderProperty(getRightLocalSortOrderColumn()));
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(),
- rangeMapHint);
- ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(),
- rangeMapHint);
+ ppLeft = new OrderedPartitionedProperty(getLeftRangeOrderColumn(), null, leftRangeId,
+ mjcf.getLeftPartitioningType(), rangeMapHint);
+ ppRight = new OrderedPartitionedProperty(getRightRangeOrderColumn(), null, rightRangeId,
+ mjcf.getRightPartitioningType(), rangeMapHint);
}
pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);
@@ -162,6 +150,30 @@ public abstract class AbstractIntervalJoinPOperator extends AbstractJoinPOperato
return new PhysicalRequirements(pv, prc);
}
+ protected ArrayList<OrderColumn> getLeftLocalSortOrderColumn() {
+ return getLeftRangeOrderColumn();
+ }
+
+ protected ArrayList<OrderColumn> getRightLocalSortOrderColumn() {
+ return getRightRangeOrderColumn();
+ }
+
+ protected ArrayList<OrderColumn> getLeftRangeOrderColumn() {
+ ArrayList<OrderColumn> order = new ArrayList<>();
+ for (LogicalVariable v : keysLeftBranch) {
+ order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
+ }
+ return order;
+ }
+
+ protected ArrayList<OrderColumn> getRightRangeOrderColumn() {
+ ArrayList<OrderColumn> orderRight = new ArrayList<>();
+ for (LogicalVariable v : keysRightBranch) {
+ orderRight.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
+ }
+ return orderRight;
+ }
+
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index f24dc7c..cf8ad89 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -154,7 +154,7 @@ public class IntervalLocalRangeOperatorDescriptor extends AbstractOperatorDescri
writers[i].open();
resultAppender[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
}
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
IRangeMap rangeMap = rangeState.getRangeMap();
nodeRangeStart = getPartitionBoundryStart(rangeMap);
nodeRangeEnd = getPartitionBoundryEnd(rangeMap);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index ca9dd69..73d159e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.algebra.operators.physical;
+import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
@@ -25,6 +26,8 @@ import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFacto
import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,57 +35,45 @@ import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.RangeId;
public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperator {
+ private static final int START = 0;
+ private static final int END = 1;
private final int memSizeInFrames;
- private final long probeTupleCount;
- private final long probeMaxDuration;
- private final long buildTupleCount;
- private final long buildMaxDuration;
- private final int avgTuplesInFrame;
+ private final int k;
+ private final List<LogicalVariable> leftPartitionVar;
+ private final List<LogicalVariable> rightPartitionVar;
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinPOperator.class.getName());
public IntervalPartitionJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
- int memSizeInFrames, long buildTupleCount, long probeTupleCount, long buildMaxDuration,
- long probeMaxDuration, int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, RangeId leftRangeId,
- RangeId rightRangeId, IRangeMap rangeMapHint) {
+ int memSizeInFrames, int k, IIntervalMergeJoinCheckerFactory mjcf, List<LogicalVariable> leftPartitionVar,
+ List<LogicalVariable> rightPartitionVar, RangeId leftRangeId, RangeId rightRangeId,
+ IRangeMap rangeMapHint) {
super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities, mjcf, leftRangeId, rightRangeId,
rangeMapHint);
this.memSizeInFrames = memSizeInFrames;
- this.buildTupleCount = buildTupleCount;
- this.probeTupleCount = probeTupleCount;
- this.buildMaxDuration = buildMaxDuration;
- this.probeMaxDuration = probeMaxDuration;
- this.avgTuplesInFrame = avgTuplesInFrame;
+ this.k = k;
+ this.leftPartitionVar = leftPartitionVar;
+ this.rightPartitionVar = rightPartitionVar;
LOGGER.fine("IntervalPartitionJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+ partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
- + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int buildTupleCount="
- + buildTupleCount + ", int probeTupleCount=" + probeTupleCount + ", int buildMaxDuration="
- + buildMaxDuration + ", int probeMaxDuration=" + probeMaxDuration + ", int avgTuplesInFrame="
- + avgTuplesInFrame + ", IMergeJoinCheckerFactory mjcf=" + mjcf + ", RangeId leftRangeId=" + leftRangeId
+ + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int k=" + k
+ + ", IMergeJoinCheckerFactory mjcf=" + mjcf + ", RangeId leftRangeId=" + leftRangeId
+ ", RangeId rightRangeId=" + rightRangeId + ".");
}
- public long getProbeTupleCount() {
- return probeTupleCount;
+ public int getK() {
+ return k;
}
- public long getProbeMaxDuration() {
- return probeMaxDuration;
+ public List<LogicalVariable> getLeftPartitionVar() {
+ return leftPartitionVar;
}
- public long getBuildTupleCount() {
- return buildTupleCount;
- }
-
- public long getBuildMaxDuration() {
- return buildMaxDuration;
- }
-
- public int getAvgTuplesInFrame() {
- return avgTuplesInFrame;
+ public List<LogicalVariable> getRightPartitionVar() {
+ return rightPartitionVar;
}
@Override
@@ -93,9 +84,36 @@ public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperato
@Override
IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, int[] keysRight, IOperatorDescriptorRegistry spec,
RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory mjcf, RangeId rangeId) {
- return new IntervalPartitionJoinOperatorDescriptor(spec, memSizeInFrames, buildTupleCount, probeTupleCount,
- buildMaxDuration, probeMaxDuration, avgTuplesInFrame, keysLeft, keysRight, recordDescriptor, mjcf,
- rangeId);
+ return new IntervalPartitionJoinOperatorDescriptor(spec, memSizeInFrames, k, keysLeft, keysRight,
+ recordDescriptor, mjcf, rangeId);
+ }
+
+ @Override
+ protected ArrayList<OrderColumn> getLeftLocalSortOrderColumn() {
+ ArrayList<OrderColumn> order = new ArrayList<>();
+ if (mjcf.isOrderAsc()) {
+ order.add(new OrderColumn(leftPartitionVar.get(END), OrderKind.ASC));
+ order.add(new OrderColumn(leftPartitionVar.get(START), OrderKind.DESC));
+ } else {
+ // TODO What does Desc'ing mean?
+ order.add(new OrderColumn(leftPartitionVar.get(START), OrderKind.ASC));
+ order.add(new OrderColumn(leftPartitionVar.get(END), OrderKind.DESC));
+ }
+ return order;
+ }
+
+ @Override
+ protected ArrayList<OrderColumn> getRightLocalSortOrderColumn() {
+ ArrayList<OrderColumn> order = new ArrayList<>();
+ if (mjcf.isOrderAsc()) {
+ order.add(new OrderColumn(rightPartitionVar.get(END), OrderKind.ASC));
+ order.add(new OrderColumn(rightPartitionVar.get(START), OrderKind.DESC));
+ } else {
+ // TODO What does Desc'ing mean?
+ order.add(new OrderColumn(rightPartitionVar.get(START), OrderKind.ASC));
+ order.add(new OrderColumn(rightPartitionVar.get(END), OrderKind.DESC));
+ }
+ return order;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 521a80e..a4959ab 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -335,7 +335,7 @@ public final class RuleCollections {
prepareForJobGenRewrites
.add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
- prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule());
+ //prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule());
// Re-infer all types, so that, e.g., the effect of not-is-null is
// propagated.
prepareForJobGenRewrites.add(new ReinferAllTypesRule());
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index f0ff610..595b994 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -45,6 +45,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -55,6 +56,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOpe
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
@@ -223,65 +225,65 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
// Connect main path
connectOperators(leftIntervalSplitRef, leftSortedInput, context);
- context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplitRef.getValue());
+ updateOperatorContext(context, leftIntervalSplitRef);
connectOperators(leftMaterialize0Ref, leftIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftMaterialize0Ref.getValue());
+ updateOperatorContext(context, leftMaterialize0Ref);
connectOperators(leftMaterialize1Ref, leftIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftMaterialize1Ref.getValue());
+ updateOperatorContext(context, leftMaterialize1Ref);
connectOperators(leftMaterialize2Ref, leftIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftMaterialize2Ref.getValue());
+ updateOperatorContext(context, leftMaterialize2Ref);
connectOperators(leftStartsSplitRef, leftMaterialize0Ref, context);
- context.computeAndSetTypeEnvironmentForOperator(leftStartsSplitRef.getValue());
+ updateOperatorContext(context, leftStartsSplitRef);
connectOperators(rightIntervalSplitRef, rightSortedInput, context);
- context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplitRef.getValue());
+ updateOperatorContext(context, rightIntervalSplitRef);
connectOperators(rightMaterialize0Ref, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightMaterialize0Ref.getValue());
+ updateOperatorContext(context, rightMaterialize0Ref);
connectOperators(rightMaterialize1Ref, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightMaterialize1Ref.getValue());
+ updateOperatorContext(context, rightMaterialize1Ref);
connectOperators(rightMaterialize2Ref, rightIntervalSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(rightMaterialize2Ref.getValue());
+ updateOperatorContext(context, rightMaterialize2Ref);
connectOperators(rightStartsSplitRef, rightMaterialize0Ref, context);
- context.computeAndSetTypeEnvironmentForOperator(rightStartsSplitRef.getValue());
+ updateOperatorContext(context, rightStartsSplitRef);
// Connect left and right starts path
connectOperators(startsJoinRef, leftStartsSplitRef, context);
connectOperators(startsJoinRef, rightStartsSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(startsJoinRef.getValue());
+ updateOperatorContext(context, startsJoinRef);
// Connect left ends path
connectOperators(leftEndsJoinRef, leftMaterialize1Ref, context);
connectOperators(leftEndsJoinRef, rightStartsSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftEndsJoinRef.getValue());
+ updateOperatorContext(context, leftEndsJoinRef);
connectOperators(union1Ref, startsJoinRef, context);
connectOperators(union1Ref, leftEndsJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union1Ref.getValue());
+ updateOperatorContext(context, union1Ref);
// Connect left covers path
connectOperators(leftCoversJoinRef, leftMaterialize2Ref, context);
connectOperators(leftCoversJoinRef, rightStartsSplitRef, context);
- context.computeAndSetTypeEnvironmentForOperator(leftCoversJoinRef.getValue());
+ updateOperatorContext(context, leftCoversJoinRef);
connectOperators(union2Ref, union1Ref, context);
connectOperators(union2Ref, leftCoversJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union2Ref.getValue());
+ updateOperatorContext(context, union2Ref);
// Connect right ends path
connectOperators(rightEndsJoinRef, leftStartsSplitRef, context);
connectOperators(rightEndsJoinRef, rightMaterialize1Ref, context);
- context.computeAndSetTypeEnvironmentForOperator(rightEndsJoinRef.getValue());
+ updateOperatorContext(context, rightEndsJoinRef);
connectOperators(union3Ref, union2Ref, context);
connectOperators(union3Ref, rightEndsJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union3Ref.getValue());
+ updateOperatorContext(context, union3Ref);
// Connect right covers path
connectOperators(rightCoversJoinRef, leftStartsSplitRef, context);
connectOperators(rightCoversJoinRef, rightMaterialize2Ref, context);
- context.computeAndSetTypeEnvironmentForOperator(rightCoversJoinRef.getValue());
+ updateOperatorContext(context, rightCoversJoinRef);
connectOperators(union4Ref, union3Ref, context);
connectOperators(union4Ref, rightCoversJoinRef, context);
- context.computeAndSetTypeEnvironmentForOperator(union4Ref.getValue());
+ updateOperatorContext(context, union4Ref);
// Update context
opRef.setValue(union4Ref.getValue());
@@ -299,6 +301,14 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
return true;
}
+ private void updateOperatorContext(IOptimizationContext context, Mutable<ILogicalOperator> operatorRef)
+ throws AlgebricksException {
+// operatorRef.getValue().recomputeSchema();
+// operatorRef.getValue().computeDeliveredPhysicalProperties(context);
+ context.computeAndSetTypeEnvironmentForOperator(operatorRef.getValue());
+ OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) operatorRef, context);
+ }
+
private LogicalVariable getSortKey(ILogicalOperator op) {
if (op.getOperatorTag() != LogicalOperatorTag.ORDER) {
return null;
@@ -341,6 +351,7 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
child.getValue().getInputs().add(eoRef);
context.computeAndSetTypeEnvironmentForOperator(eo);
context.computeAndSetTypeEnvironmentForOperator(child.getValue());
+ OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) eo, context);
} else {
if (parent.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
ReplicateOperator ro = (ReplicateOperator) parent.getValue();
@@ -418,7 +429,8 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo;
MergeJoinPOperator mjpoClone = new MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(),
mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), memoryJoinSize,
- mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), mjpo.getRightRangeId(), null);
+ mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), mjpo.getRightRangeId(),
+ mjpo.getRangeMapHint());
ijoClone.setPhysicalOperator(mjpoClone);
} else if (joinPo.getOperatorTag() == PhysicalOperatorTag.EXTENSION_OPERATOR) {
if (joinPo instanceof IntervalIndexJoinPOperator) {
@@ -426,16 +438,15 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
IntervalIndexJoinPOperator iijpoClone = new IntervalIndexJoinPOperator(iijpo.getKind(),
iijpo.getPartitioningType(), iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(),
memoryJoinSize, iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getLeftRangeId(),
- iijpo.getRightRangeId(), null);
+ iijpo.getRightRangeId(), iijpo.getRangeMapHint());
ijoClone.setPhysicalOperator(iijpoClone);
} else if (joinPo instanceof IntervalPartitionJoinPOperator) {
IntervalPartitionJoinPOperator ipjpo = (IntervalPartitionJoinPOperator) joinPo;
IntervalPartitionJoinPOperator iijpoClone = new IntervalPartitionJoinPOperator(ipjpo.getKind(),
ipjpo.getPartitioningType(), ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(),
- memoryJoinSize, ipjpo.getBuildTupleCount(), ipjpo.getProbeTupleCount(),
- ipjpo.getBuildMaxDuration(), ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(),
- ipjpo.getIntervalMergeJoinCheckerFactory(), ipjpo.getLeftRangeId(), ipjpo.getRightRangeId(),
- null);
+ memoryJoinSize, ipjpo.getK(), ipjpo.getIntervalMergeJoinCheckerFactory(),
+ ipjpo.getLeftPartitionVar(), ipjpo.getRightPartitionVar(), ipjpo.getLeftRangeId(),
+ ipjpo.getRightRangeId(), ipjpo.getRangeMapHint());
ijoClone.setPhysicalOperator(iijpoClone);
} else {
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index 795ee82..fb70aea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -18,17 +18,23 @@
*/
package org.apache.asterix.optimizer.rules.util;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory;
@@ -44,23 +50,38 @@ import org.apache.asterix.runtime.operators.joins.OverlappingIntervalMergeJoinCh
import org.apache.asterix.runtime.operators.joins.OverlapsIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.StartedByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.StartsIntervalMergeJoinCheckerFactory;
+import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
public class JoinUtils {
+ private static final int LEFT = 0;
+ private static final int RIGHT = 1;
+
private static final Logger LOGGER = Logger.getLogger(JoinUtils.class.getName());
private static final Map<FunctionIdentifier, FunctionIdentifier> INTERVAL_JOIN_CONDITIONS = new HashMap<>();
@@ -98,8 +119,8 @@ public class JoinUtils {
}
List<LogicalVariable> sideLeft = new LinkedList<>();
List<LogicalVariable> sideRight = new LinkedList<>();
- List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();
- List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();
+ List<LogicalVariable> varsLeft = op.getInputs().get(LEFT).getValue().getSchema();
+ List<LogicalVariable> varsRight = op.getInputs().get(RIGHT).getValue().getSchema();
AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) conditionLE;
FunctionIdentifier fi = isIntervalJoinCondition(fexp, varsLeft, varsRight, sideLeft, sideRight);
if (fi != null) {
@@ -140,43 +161,116 @@ public class JoinUtils {
private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
- IOptimizationContext context) {
+ IOptimizationContext context) throws AlgebricksException {
RangeId leftRangeId = context.newRangeId();
+ RangeId rightRangeId = context.newRangeId();
+ insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+ insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
+
IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, sideLeft,
sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
- context.newRangeId(), ijea.getRangeMap()));
+ rightRangeId, ijea.getRangeMap()));
}
private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
- List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
- IOptimizationContext context) {
- long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount() : getCardinality(sideLeft, context);
+ List<LogicalVariable> leftKeys, List<LogicalVariable> rightKeys, IntervalJoinExpressionAnnotation ijea,
+ IOptimizationContext context) throws AlgebricksException {
+ long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount() : getCardinality(leftKeys, context);
long rightCount = ijea.getRightRecordCount() > 0 ? ijea.getRightRecordCount()
- : getCardinality(sideRight, context);
+ : getCardinality(rightKeys, context);
long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? ijea.getLeftMaxDuration()
- : getMaxDuration(sideLeft, context);
+ : getMaxDuration(leftKeys, context);
long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? ijea.getRightMaxDuration()
- : getMaxDuration(sideRight, context);
+ : getMaxDuration(rightKeys, context);
int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? ijea.getTuplesPerFrame()
: context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame();
+ int k = IntervalPartitionUtil.determineK(leftCount, leftMaxDuration, rightCount, rightMaxDuration,
+ tuplesPerFrame);
+ if (k <= 2) {
+ k = 3;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("IntervalPartitionJoin has overridden the suggested value of k (" + k + ") with 3.");
+ }
+ }
+
RangeId leftRangeId = context.newRangeId();
+ RangeId rightRangeId = context.newRangeId();
+ insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+ insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
+
+ List<LogicalVariable> leftPartitionVar = Arrays.asList(context.newVar(), context.newVar());
+ List<LogicalVariable> rightPartitionVar = Arrays.asList(context.newVar(), context.newVar());
+ insertPartitionSortKey(op, LEFT, leftPartitionVar, leftKeys.get(0), leftRangeId, k, context);
+ insertPartitionSortKey(op, RIGHT, rightPartitionVar, rightKeys.get(0), rightRangeId, k, context);
+
IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
- sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
- rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, mjcf, leftRangeId, context.newRangeId(),
- ijea.getRangeMap()));
+ leftKeys, rightKeys, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), k, mjcf,
+ leftPartitionVar, rightPartitionVar, leftRangeId, rightRangeId, ijea.getRangeMap()));
+ }
+
+ private static void insertRangeForward(AbstractBinaryJoinOperator op, int branch, RangeId rangeId,
+ IRangeMap rangeMap, IOptimizationContext context) throws AlgebricksException {
+ RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap);
+ rfo.setExecutionMode(op.getExecutionMode());
+ rfo.getInputs().add(op.getInputs().get(branch));
+ RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, rangeMap);
+ rfo.setPhysicalOperator(rfpo);
+ Mutable<ILogicalOperator> rfoRef = new MutableObject<>(rfo);
+ op.getInputs().set(branch, rfoRef);
+
+ context.computeAndSetTypeEnvironmentForOperator(rfo);
+ }
+
+ private static void insertPartitionSortKey(AbstractBinaryJoinOperator op, int branch,
+ List<LogicalVariable> partitionVars, LogicalVariable intervalVar, RangeId rangeId, int k,
+ IOptimizationContext context) throws AlgebricksException {
+ MutableObject<ILogicalExpression> intervalExp = new MutableObject<>(
+ new VariableReferenceExpression(intervalVar));
+ MutableObject<ILogicalExpression> rangeIdConstant = new MutableObject<>(
+ new ConstantExpression(new AsterixConstantValue(new AInt32(rangeId.getId()))));
+ MutableObject<ILogicalExpression> kConstant = new MutableObject<>(
+ new ConstantExpression(new AsterixConstantValue(new AInt32(k))));
+
+ List<Mutable<ILogicalExpression>> assignExps = new ArrayList<>();
+ // Start partition
+ IFunctionInfo startFi = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_START);
+ @SuppressWarnings("unchecked")
+ ScalarFunctionCallExpression startPartitionExp = new ScalarFunctionCallExpression(startFi, intervalExp,
+ rangeIdConstant, kConstant);
+ assignExps.add(new MutableObject<ILogicalExpression>(startPartitionExp));
+ // End partition
+ IFunctionInfo endFi = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_END);
+ @SuppressWarnings("unchecked")
+ ScalarFunctionCallExpression endPartitionExp = new ScalarFunctionCallExpression(endFi, intervalExp,
+ rangeIdConstant, kConstant);
+ assignExps.add(new MutableObject<ILogicalExpression>(endPartitionExp));
+
+ AssignOperator ao = new AssignOperator(partitionVars, assignExps);
+ ao.setExecutionMode(op.getExecutionMode());
+ AssignPOperator apo = new AssignPOperator();
+ ao.setPhysicalOperator(apo);
+ Mutable<ILogicalOperator> aoRef = new MutableObject<>(ao);
+ ao.getInputs().add(op.getInputs().get(branch));
+ op.getInputs().set(branch, aoRef);
+
+ context.computeAndSetTypeEnvironmentForOperator(ao);
}
private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
- IOptimizationContext context) {
+ IOptimizationContext context) throws AlgebricksException {
RangeId leftRangeId = context.newRangeId();
+ RangeId rightRangeId = context.newRangeId();
+ insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+ insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
+
IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
- context.newRangeId(), ijea.getRangeMap()));
+ rightRangeId, ijea.getRangeMap()));
}
private static int getMaxDuration(List<LogicalVariable> lv, IOptimizationContext context) {
@@ -202,8 +296,8 @@ public class JoinUtils {
} else {
return null;
}
- ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
- ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+ ILogicalExpression opLeft = fexp.getArguments().get(LEFT).getValue();
+ ILogicalExpression opRight = fexp.getArguments().get(RIGHT).getValue();
if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
|| opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
return null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 69b9853..71719d5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -261,6 +261,8 @@ import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalMetByDes
import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlappedByDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlappingDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlapsDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalPartitionJoinEndDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalPartitionJoinStartDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalStartedByDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.IntervalStartsDescriptor;
import org.apache.asterix.runtime.evaluators.functions.temporal.MillisecondsFromDayTimeDurationDescriptor;
@@ -401,6 +403,10 @@ public class FunctionCollection {
temp.add(SimilarityJaccardPrefixDescriptor.FACTORY);
temp.add(SimilarityJaccardPrefixCheckDescriptor.FACTORY);
+ // Partition functions for interval partition join pre-sorting
+ temp.add(IntervalPartitionJoinStartDescriptor.FACTORY);
+ temp.add(IntervalPartitionJoinEndDescriptor.FACTORY);
+
// functions that need generated class for null-handling.
List<IFunctionDescriptorFactory> functionsToInjectUnkownHandling = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 7309f0c..f4342ca 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -79,7 +79,7 @@ public class AsterixPropertiesAccessor {
fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
is = new FileInputStream(fileName);
} catch (FileNotFoundException fnf) {
- throw new AsterixException("Could not find configuration file " + fileName);
+ throw new AsterixException("Could not find configuration file " + fileName, fnf);
}
}
@@ -90,7 +90,7 @@ public class AsterixPropertiesAccessor {
Unmarshaller unmarshaller = ctx.createUnmarshaller();
asterixConfiguration = (AsterixConfiguration) unmarshaller.unmarshal(is);
} catch (JAXBException e) {
- throw new AsterixException("Failed to read configuration file " + fileName);
+ throw new AsterixException("Failed to read configuration file " + fileName, e);
}
instanceName = asterixConfiguration.getInstanceName();
metadataNodeName = asterixConfiguration.getMetadataNode();
@@ -117,7 +117,7 @@ public class AsterixPropertiesAccessor {
nodePartitionsMap.put(store.getNcId(), nodePartitions);
nodeNames.add(store.getNcId());
}
- asterixConfigurationParams = new HashMap<String, Property>();
+ asterixConfigurationParams = new HashMap<>();
for (Property p : asterixConfiguration.getProperty()) {
asterixConfigurationParams.put(p.getName(), p);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
index e5950e3..2d159a6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -525,6 +525,10 @@ public class AsterixBuiltinFunctions {
"interval-ends", 2);
public static final FunctionIdentifier INTERVAL_ENDED_BY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"interval-ended-by", 2);
+ public static final FunctionIdentifier INTERVAL_PARTITION_JOIN_START = new FunctionIdentifier(
+ FunctionConstants.ASTERIX_NS, "interval-partition-join-start", 3);
+ public static final FunctionIdentifier INTERVAL_PARTITION_JOIN_END = new FunctionIdentifier(
+ FunctionConstants.ASTERIX_NS, "interval-partition-join-end", 3);
public static final FunctionIdentifier CURRENT_TIME = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"current-time", 0);
public static final FunctionIdentifier CURRENT_DATE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1009,6 +1013,8 @@ public class AsterixBuiltinFunctions {
addFunction(INTERVAL_COVERED_BY, ABooleanTypeComputer.INSTANCE, true);
addFunction(INTERVAL_ENDS, ABooleanTypeComputer.INSTANCE, true);
addFunction(INTERVAL_ENDED_BY, ABooleanTypeComputer.INSTANCE, true);
+ addPrivateFunction(INTERVAL_PARTITION_JOIN_START, AInt32TypeComputer.INSTANCE, true);
+ addPrivateFunction(INTERVAL_PARTITION_JOIN_END, AInt32TypeComputer.INSTANCE, true);
addFunction(CURRENT_DATE, ADateTypeComputer.INSTANCE, false);
addFunction(CURRENT_TIME, ATimeTypeComputer.INSTANCE, false);
addFunction(CURRENT_DATETIME, ADateTimeTypeComputer.INSTANCE, false);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
index 1bbd745..034cfeb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
@@ -35,12 +35,12 @@ public class FunctionManagerImpl implements IFunctionManager {
private final Map<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory> functions;
public FunctionManagerImpl() {
- functions = new HashMap<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory>();
+ functions = new HashMap<>();
}
@Override
public synchronized IFunctionDescriptor lookupFunction(FunctionIdentifier fid) throws AlgebricksException {
- Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, Integer>(fid, fid.getArity());
+ Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, fid.getArity());
IFunctionDescriptorFactory factory = functions.get(key);
if (factory == null) {
throw new AlgebricksException("Inappropriate use of function " + "'" + fid.getName() + "'");
@@ -58,7 +58,7 @@ public class FunctionManagerImpl implements IFunctionManager {
public synchronized void unregisterFunction(IFunctionDescriptorFactory descriptorFactory)
throws AlgebricksException {
FunctionIdentifier fid = descriptorFactory.createFunctionDescriptor().getIdentifier();
- Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, Integer>(fid, fid.getArity());
+ Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, fid.getArity());
functions.remove(key);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
index c42865c..b053d99 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
@@ -63,9 +63,9 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
* <p/>
*/
public class CalendarDurationFromDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
- private final static long serialVersionUID = 1L;
- public final static FunctionIdentifier FID = AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
- public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ private static final long serialVersionUID = 1L;
+ public static final FunctionIdentifier FID = AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
new file mode 100644
index 0000000..8d737f4
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class IntervalPartitionJoinEndDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new IntervalPartitionJoinEndDescriptor();
+ }
+ };
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IntervalPartitionJoinFunction(args, ctx, false);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_END;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
new file mode 100644
index 0000000..4725fa5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import java.io.DataOutput;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
+
+public class IntervalPartitionJoinFunction implements IScalarEvaluator {
+
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
+ private IPointable argPtr0 = new VoidPointable();
+ private IPointable argPtr1 = new VoidPointable();
+ private IPointable argPtr2 = new VoidPointable();
+ private int rangeIdCache = -1;
+ private long partitionStart;
+ private long partitionDuration;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ private AMutableInt32 aInt = new AMutableInt32(0);
+
+ private IHyracksTaskContext ctx;
+ private IScalarEvaluator eval0;
+ private IScalarEvaluator eval1;
+ private IScalarEvaluator eval2;
+ private boolean startPoint;
+
+ public IntervalPartitionJoinFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, boolean startPoint)
+ throws AlgebricksException {
+ this.ctx = ctx;
+ this.eval0 = args[0].createScalarEvaluator(ctx);
+ this.eval1 = args[1].createScalarEvaluator(ctx);
+ this.eval2 = args[2].createScalarEvaluator(ctx);
+ this.startPoint = startPoint;
+ }
+
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+ resultStorage.reset();
+ // Interval
+ eval0.evaluate(tuple, argPtr0);
+ // rangeId
+ eval1.evaluate(tuple, argPtr1);
+ // k
+ eval2.evaluate(tuple, argPtr2);
+
+ byte[] bytes0 = argPtr0.getByteArray();
+ int offset0 = argPtr0.getStartOffset();
+ byte[] bytes1 = argPtr1.getByteArray();
+ int offset1 = argPtr1.getStartOffset();
+ byte[] bytes2 = argPtr2.getByteArray();
+ int offset2 = argPtr2.getStartOffset();
+
+ try {
+ if (bytes0[offset0] != ATypeTag.SERIALIZED_INTERVAL_TYPE_TAG) {
+ throw new AlgebricksException("Expected type INTERVAL for parameter 0 but got "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes0[offset0]));
+ }
+
+ if (bytes1[offset1] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
+ throw new AlgebricksException("Expected type INT32 for parameter 1 but got "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes1[offset1]));
+ }
+
+ if (bytes2[offset2] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
+ throw new AlgebricksException("Expected type INT32 for parameter 2 but got "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes2[offset2]));
+ }
+
+ long point;
+ if (startPoint) {
+ point = AIntervalSerializerDeserializer.getIntervalStart(bytes0, offset0 + 1);
+ } else {
+ point = AIntervalSerializerDeserializer.getIntervalEnd(bytes0, offset0 + 1);
+ }
+ int rangeId = AInt32SerializerDeserializer.getInt(bytes1, offset1 + 1);
+ int k = AInt32SerializerDeserializer.getInt(bytes2, offset2 + 1);
+
+ if (rangeId != rangeIdCache) {
+ // Only load new values if the range changed.
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId, ctx);
+ IRangeMap rangeMap = rangeState.getRangeMap();
+ partitionStart = LongPointable.getLong(rangeMap.getMinByteArray(0), rangeMap.getMinStartOffset(0) + 1);
+ long partitionEnd = LongPointable.getLong(rangeMap.getMaxByteArray(0),
+ rangeMap.getMaxStartOffset(0) + 1);
+ partitionDuration = IntervalPartitionUtil.getPartitionDuration(partitionStart, partitionEnd, k);
+ rangeIdCache = rangeId;
+ }
+
+ int partition = IntervalPartitionUtil.getIntervalPartition(point, partitionStart, partitionDuration, k);
+ aInt.setValue(partition);
+ intSerde.serialize(aInt, out);
+ } catch (HyracksDataException hex) {
+ throw new AlgebricksException(hex);
+ }
+ result.set(resultStorage);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
new file mode 100644
index 0000000..cc1d2f1
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class IntervalPartitionJoinStartDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new IntervalPartitionJoinStartDescriptor();
+ }
+ };
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+ return new IntervalPartitionJoinFunction(args, ctx, true);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_START;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
index a5f7770..880e181 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
@@ -39,8 +39,7 @@ public class OverlappingIntervalMergeJoinCheckerFactory extends AbstractInterval
public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition,
IHyracksTaskContext ctx) throws HyracksDataException {
int fieldIndex = 0;
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
- .getStateObject(new RangeId(rangeId.getId(), ctx));
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
IRangeMap rangeMap = rangeState.getRangeMap();
if (ATypeTag.INT64.serialize() != rangeMap.getTag(0, 0)) {
throw new HyracksDataException("Invalid range map type for interval merge join checker.");
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
index d09a941..38ab073 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
@@ -18,9 +18,6 @@
*/
package org.apache.asterix.runtime.operators.joins.intervalpartition;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -34,24 +31,12 @@ public class IntervalPartitionComputerFactory implements ITuplePartitionComputer
private final long partitionStart;
private final long partitionDuration;
- private static final Logger LOGGER = Logger.getLogger(IntervalPartitionComputerFactory.class.getName());
-
public IntervalPartitionComputerFactory(int intervalFieldId, int k, long partitionStart, long partitionEnd)
throws HyracksDataException {
this.intervalFieldId = intervalFieldId;
this.k = k;
this.partitionStart = partitionStart;
- if (k <= 2) {
- throw new HyracksDataException("k is to small for interval partitioner.");
- }
- long duration = (partitionEnd - partitionStart) / (k - 2);
- if (duration <= 0) {
- duration = 1;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.fine("The interval partitioner using the smallest duration (1).");
- }
- }
- partitionDuration = duration;
+ this.partitionDuration = IntervalPartitionUtil.getPartitionDuration(partitionStart, partitionEnd, k);
}
@Override
@@ -66,12 +51,7 @@ public class IntervalPartitionComputerFactory implements ITuplePartitionComputer
}
private int getIntervalPartition(long point) throws HyracksDataException {
- if (point < partitionStart) {
- return 0;
- }
- long pointFloor = Math.floorDiv(point - partitionStart, partitionDuration);
- // Add one to the partition, since 0 represents any point before the start partition point.
- return (int) Math.min(pointFloor + 1, k - 1L);
+ return IntervalPartitionUtil.getIntervalPartition(point, partitionStart, partitionDuration, k);
}
public int getIntervalPartitionI(IFrameTupleAccessor accessor, int tIndex, int fieldId)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 4e1850c..6ea1e6f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -58,11 +58,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private final int[] probeKeys;
private final int[] buildKeys;
- private final long probeTupleCount;
- private final long probeMaxDuration;
- private final long buildTupleCount;
- private final long buildMaxDuration;
- private final int avgTuplesPerFrame;
+ private final int k;
+
private final int probeKey;
private final int buildKey;
private final IIntervalMergeJoinCheckerFactory imjcf;
@@ -70,19 +67,14 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
- public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount,
- long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
+ public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int k, int[] leftKeys,
int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
RangeId rangeId) {
super(spec, 2, 1);
this.memsize = memsize;
this.buildKey = leftKeys[0];
this.probeKey = rightKeys[0];
- this.buildTupleCount = leftTupleCount;
- this.probeTupleCount = rightTupleCount;
- this.buildMaxDuration = leftMaxDuration;
- this.probeMaxDuration = rightMaxDuration;
- this.avgTuplesPerFrame = avgTuplesPerFrame;
+ this.k = k;
this.buildKeys = leftKeys;
this.probeKeys = rightKeys;
recordDescriptors[0] = recordDescriptor;
@@ -136,8 +128,6 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
- final int k = IntervalPartitionUtil.determineK(buildTupleCount, buildMaxDuration, probeTupleCount,
- probeMaxDuration, avgTuplesPerFrame);
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -152,16 +142,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
throw new HyracksDataException("not enough memory for join");
}
state.k = k;
- if (k <= 2) {
- state.k = 3;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("IntervalPartitionJoin has overridden the suggested value of k (" + state.k
- + ") with 3.");
- }
- }
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
- .getStateObject(new RangeId(rangeId.getId(), ctx));
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
partition);
long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 415feae..453287d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -29,6 +29,7 @@ import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
public class IntervalPartitionUtil {
@@ -232,4 +233,25 @@ public class IntervalPartitionUtil {
return inMemoryMap;
}
+ public static long getPartitionDuration(long partitionStart, long partitionEnd, int k) throws HyracksDataException {
+ if (k <= 2) {
+ throw new HyracksDataException("k is to small for interval partitioner.");
+ }
+ long duration = (partitionEnd - partitionStart) / (k - 2);
+ if (duration <= 0) {
+ duration = 1;
+ }
+ return duration;
+ }
+
+ public static int getIntervalPartition(long point, long partitionStart, long partitionDuration, int k)
+ throws HyracksDataException {
+ if (point < partitionStart) {
+ return 0;
+ }
+ long pointFloor = Math.floorDiv(point - partitionStart, partitionDuration);
+ // Add one to the partition, since 0 represents any point before the start partition point.
+ return (int) Math.min(pointFloor + 1, k - 1L);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index 2f2b7e3..fc43ecb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -27,7 +27,7 @@ public final class FunctionIdentifier implements Serializable {
private final String name;
private final int arity;
- public final static int VARARGS = -1;
+ public static final int VARARGS = -1;
public FunctionIdentifier(String namespace, String name) {
this(namespace, name, VARARGS);
@@ -60,6 +60,7 @@ public final class FunctionIdentifier implements Serializable {
return name.hashCode() + namespace.hashCode();
}
+ @Override
public String toString() {
return getNamespace() + ":" + name;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 873b847..93d878c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -290,6 +290,7 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
@Override
public Void visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
return null;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f3adffd..5735e9a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -585,8 +585,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
- // Add RangeForwardOperator.
- addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), opp.getRangeMapHint(), context);
+ // The RangeForwardOperator should already be in the plan.
boolean propWasSet = false;
pop = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index 6bc129e..c670b6b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -48,10 +48,10 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
*/
public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
- private final Set<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
- private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
- private final Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
- private final List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
+ private final Set<LogicalVariable> usedVars = new HashSet<>();
+ private final Set<LogicalVariable> liveVars = new HashSet<>();
+ private final Set<LogicalVariable> producedVars = new HashSet<>();
+ private final List<LogicalVariable> projectVars = new ArrayList<>();
protected boolean hasRun = false;
@Override
@@ -78,7 +78,7 @@ public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
VariableUtilities.getUsedVariables(op, usedVars);
// In the top-down pass, maintain a set of variables that are used in op and all its parents.
- HashSet<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> parentsUsedVars = new HashSet<>();
parentsUsedVars.addAll(parentUsedVars);
parentsUsedVars.addAll(usedVars);
@@ -115,7 +115,7 @@ public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
ILogicalOperator childOp = op.getInputs().get(i).getValue();
liveVars.clear();
VariableUtilities.getLiveVariables(childOp, liveVars);
- List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> vars = new ArrayList<>();
vars.addAll(projectVars);
// Only retain those variables that are live in the i-th input branch.
vars.retainAll(liveVars);
@@ -132,8 +132,8 @@ public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
liveVars.clear();
VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
ProjectOperator projectOp = (ProjectOperator) op;
- List<LogicalVariable> projectVars = projectOp.getVariables();
- if (liveVars.size() == projectVars.size() && liveVars.containsAll(projectVars)) {
+ List<LogicalVariable> projectVarsTemp = projectOp.getVariables();
+ if (liveVars.size() == projectVarsTemp.size() && liveVars.containsAll(projectVarsTemp)) {
boolean eliminateProject = true;
// For UnionAll the variables must also be in exactly the correct order.
if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
@@ -155,7 +155,7 @@ public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, ProjectOperator projectOp,
int unionInputIndex) throws AlgebricksException {
- List<LogicalVariable> orderedLiveVars = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> orderedLiveVars = new ArrayList<>();
VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), orderedLiveVars);
int numVars = orderedLiveVars.size();
for (int i = 0; i < numVars; i++) {