You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:11 UTC
[21/50] [abbrv] asterixdb git commit: snapshot for range state
transition
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index d6cd363..9ca536b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
super(ctx, partition, status, locks, leftRd, rightRd);
this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT;
- this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition);
+ this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, null);
this.leftKey = leftKeys[0];
this.rightKey = rightKeys[0];
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 0dd358c..8c4c43d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -37,12 +37,13 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -65,14 +66,14 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
private final int probeKey;
private final int buildKey;
private final IIntervalMergeJoinCheckerFactory imjcf;
- private final IRangeMap rangeMap;
+ private final RangeId rangeId;
private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount,
long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
- IRangeMap rangeMap) {
+ RangeId rangeId) {
super(spec, 2, 1);
this.memsize = memsize;
this.buildKey = leftKeys[0];
@@ -86,7 +87,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
this.probeKeys = rightKeys;
recordDescriptors[0] = recordDescriptor;
this.imjcf = imjcf;
- this.rangeMap = rangeMap;
+ this.rangeId = rangeId;
}
@Override
@@ -137,8 +138,6 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
final int k = IntervalPartitionUtil.determineK(buildTupleCount, buildMaxDuration, probeTupleCount,
probeMaxDuration, avgTuplesPerFrame);
- final long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeMap, partition);
- final long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeMap, partition);
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -160,6 +159,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
+ ") with 3.");
}
}
+
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+ long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition);
+ long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, k, partitionStart,
partitionEnd).createPartitioner();
ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, k, partitionStart,
@@ -168,7 +171,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
state.partition = partition;
state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
state.memoryForJoin = memsize;
- IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition);
+ IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, rangeState.getRangeMap());
state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 4c80ba8..415feae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -28,8 +28,8 @@ import java.util.Map.Entry;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
public class IntervalPartitionUtil {
public static final double C_CPU = 0.5;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 328697d..dfd36a2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -33,7 +33,7 @@ import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-public interface IOptimizationContext extends ITypingContext, IVariableContext {
+public interface IOptimizationContext extends ITypingContext, IVariableContext, IRangeContext {
@Override
public IMetadataProvider<?, ?> getMetadataProvider();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
new file mode 100644
index 0000000..9a70b7a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.base;
+
+import org.apache.hyracks.dataflow.std.base.RangeId;
+
+public interface IRangeContext {
+
+ public RangeId newRangeId();
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
index 5a3bc98..58c7a80 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
@@ -28,16 +28,23 @@ import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagation
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.dataflow.std.base.RangeId;
public class RangeForwardOperator extends AbstractLogicalOperator {
+ private RangeId rangeId;
private IRangeMap rangeMap;
- public RangeForwardOperator(IRangeMap rangeMap) {
+ public RangeForwardOperator(RangeId rangeId, IRangeMap rangeMap) {
+ this.rangeId = rangeId;
this.rangeMap = rangeMap;
}
+ public RangeId getRangeId() {
+ return rangeId;
+ }
+
public IRangeMap getRangeMap() {
return rangeMap;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 8cb739a..7a5a89a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -410,7 +410,7 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, ILogicalOperator arg)
throws AlgebricksException {
// TODO fix deep copy of range map
- RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeMap());
+ RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeId(), op.getRangeMap());
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 6337187..fea0431 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -390,7 +390,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
@Override
public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException {
- return new RangeForwardOperator(op.getRangeMap());
+ return new RangeForwardOperator(op.getRangeId(), op.getRangeMap());
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 34c707b..91dba24 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -100,7 +100,7 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
}
ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
- return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ return new Pair<>(conn, null);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
index 51f54f6..c24f3c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
@@ -44,9 +44,10 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
import org.apache.hyracks.dataflow.std.join.MergeJoinOperatorDescriptor;
@@ -56,23 +57,28 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
protected final List<LogicalVariable> keysLeftBranch;
protected final List<LogicalVariable> keysRightBranch;
private final IMergeJoinCheckerFactory mjcf;
- private IRangeMap rangeMap;
+ private final RangeId leftRangeId;
+ private final RangeId rightRangeId;
+ private final IRangeMap rangeMapHint;
private static final Logger LOGGER = Logger.getLogger(MergeJoinPOperator.class.getName());
public MergeJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, List<LogicalVariable> sideLeft,
- List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
+ List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, RangeId leftRangeId,
+ RangeId rightRangeId, IRangeMap rangeMapHint) {
super(kind, partitioningType);
this.memSizeInFrames = memSizeInFrames;
this.keysLeftBranch = sideLeft;
this.keysRightBranch = sideRight;
this.mjcf = mjcf;
- this.rangeMap = rangeMap;
+ this.leftRangeId = leftRangeId;
+ this.rightRangeId = rightRangeId;
+ this.rangeMapHint = rangeMapHint;
LOGGER.fine("MergeJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+ partitioningType + ", List<LogicalVariable>=" + keysLeftBranch + ", List<LogicalVariable>="
+ keysRightBranch + ", int memSizeInFrames=" + memSizeInFrames + ", IMergeJoinCheckerFactory mjcf="
- + mjcf + ", IRangeMap rangeMap=" + rangeMap + ".");
+ + mjcf + ", RangeId leftRangeId=" + leftRangeId + ", RangeId rightRangeId=" + rightRangeId + ".");
}
public List<LogicalVariable> getKeysLeftBranch() {
@@ -87,8 +93,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
return mjcf;
}
- public IRangeMap getRangeMap() {
- return rangeMap;
+ public RangeId getRangeId() {
+ return leftRangeId;
}
@Override
@@ -107,7 +113,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
for (LogicalVariable v : keysLeftBranch) {
order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
}
- IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, rangeMap, RangePartitioningType.PROJECT);
+ IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT,
+ rangeMapHint);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
propsLocal.add(new LocalOrderProperty(order));
deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -137,8 +144,10 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
ispRight.add(new LocalOrderProperty(orderRight));
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- ppLeft = new OrderedPartitionedProperty(orderLeft, null, rangeMap, mjcf.getLeftPartitioningType());
- ppRight = new OrderedPartitionedProperty(orderRight, null, rangeMap, mjcf.getRightPartitioningType());
+ ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(),
+ rangeMapHint);
+ ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(),
+ rangeMapHint);
}
pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
index 2324c5e..3ace793 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
@@ -29,18 +29,28 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor;
public class RangeForwardPOperator extends AbstractPhysicalOperator {
+ private RangeId rangeId;
private IRangeMap rangeMap;
- public RangeForwardPOperator(IRangeMap rangeMap) {
+ public RangeForwardPOperator(RangeId rangeId, IRangeMap rangeMap) {
+ // Use when a range hint is provided.
+ this.rangeId = rangeId;
this.rangeMap = rangeMap;
}
+ public RangeForwardPOperator(RangeId rangeId) {
+ this(rangeId, null);
+ }
+
public IRangeMap getRangeMap() {
return rangeMap;
}
@@ -72,7 +82,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator {
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeMap);
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+ RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeId, rangeMap,
+ recordDescriptor);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
@@ -82,4 +95,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator {
public boolean expensiveThanMaterialization() {
return false;
}
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + rangeId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 8299f61..c58bb0c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -48,25 +48,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitioningConnectorDescriptor;
public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
private List<OrderColumn> partitioningFields;
private INodeDomain domain;
- private IRangeMap rangeMap;
+ private RangeId rangeId;
private RangePartitioningType rangeType;
- public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap,
+ public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeId rangeId,
RangePartitioningType rangeType) {
this.partitioningFields = partitioningFields;
this.domain = domain;
- this.rangeMap = rangeMap;
+ this.rangeId = rangeId;
this.rangeType = rangeType;
}
@@ -83,8 +83,8 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
return rangeType;
}
- public IRangeMap getRangeMap() {
- return rangeMap;
+ public RangeId getRangeId() {
+ return rangeId;
}
public INodeDomain getDomain() {
@@ -94,7 +94,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain,
- rangeMap, rangeType);
+ rangeId, rangeType, null);
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
List<ILocalStructuralProperty> locals = new ArrayList<>();
@@ -141,15 +141,15 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
i++;
}
ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
- rangeMap, rangeType);
- IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, sortFields, binaryComps,
+ rangeType);
+ IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, rangeId, sortFields, binaryComps,
nkcf);
return new Pair<>(conn, null);
}
@Override
public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
+ return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 9667427..31cd37d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -51,25 +51,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitionMergingConnectorDescriptor;
public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
private List<OrderColumn> partitioningFields;
private INodeDomain domain;
- private IRangeMap rangeMap;
+ private RangeId rangeId;
private RangePartitioningType rangeType;
public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
- IRangeMap rangeMap, RangePartitioningType rangeType) {
+ RangeId rangeId, RangePartitioningType rangeType) {
this.partitioningFields = partitioningFields;
this.domain = domain;
- this.rangeMap = rangeMap;
+ this.rangeId = rangeId;
this.rangeType = rangeType;
}
@@ -86,8 +86,8 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
return rangeType;
}
- public IRangeMap getRangeMap() {
- return rangeMap;
+ public RangeId getRangeId() {
+ return rangeId;
}
public INodeDomain getDomain() {
@@ -96,7 +96,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeMap, rangeType);
+ IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeId, rangeType, null);
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
List<ILocalStructuralProperty> locals = new ArrayList<>();
@@ -152,15 +152,15 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
i++;
}
ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
- rangeMap, rangeType);
- IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, sortFields,
+ rangeType);
+ IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, rangeId, sortFields,
binaryComps, nkcf);
return new Pair<>(conn, null);
}
@Override
public String toString() {
- return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
+ return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index c1dcd97..040e663 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -25,29 +25,27 @@ import java.util.Map;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
public class OrderedPartitionedProperty implements IPartitioningProperty {
private List<OrderColumn> orderColumns;
private INodeDomain domain;
- private IRangeMap rangeMap;
+ private RangeId rangeId;
private RangePartitioningType rangeType;
- public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap,
- RangePartitioningType rangeType) {
+ public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId,
+ RangePartitioningType rangeType, IRangeMap rangeMapHint) {
this.domain = domain;
this.orderColumns = orderColumns;
- this.rangeMap = rangeMap;
+ this.rangeId = rangeId;
this.rangeType = rangeType;
}
- public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap) {
- this.domain = domain;
- this.orderColumns = orderColumns;
- this.rangeMap = rangeMap;
- this.rangeType = RangePartitioningType.PROJECT;
+ public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId) {
+ this(orderColumns, domain, rangeId, RangePartitioningType.PROJECT, null);
}
public List<OrderColumn> getOrderColumns() {
@@ -81,7 +79,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
List<FunctionalDependency> fds) {
List<OrderColumn> columns = PropertiesUtil.replaceOrderColumnsByEqClasses(orderColumns, equivalenceClasses);
columns = PropertiesUtil.applyFDsToOrderColumns(columns, fds);
- return new OrderedPartitionedProperty(columns, domain, rangeMap);
+ return new OrderedPartitionedProperty(columns, domain, rangeId);
}
@Override
@@ -91,8 +89,8 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
}
}
- public IRangeMap getRangeMap() {
- return rangeMap;
+ public RangeId getRangeId() {
+ return rangeId;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index d3e21f7..6400dd1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -161,11 +161,11 @@ public class PropertiesUtil {
if (mayExpandProperties) {
return (isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator())
&& or.getRangePartitioningType().equals(od.getRangePartitioningType())
- && or.getRangeMap().equals(od.getRangeMap()));
+ && or.getRangeId().equals(od.getRangeId()));
} else {
return (or.getOrderColumns().equals(od.getOrderColumns())
&& or.getRangePartitioningType().equals(od.getRangePartitioningType())
- && or.getRangeMap().equals(od.getRangeMap()));
+ && or.getRangeId().equals(od.getRangeId()));
}
}
default: {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 86f61ad..12b9c66 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -42,17 +42,19 @@ import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDom
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.dataflow.std.base.RangeId;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class AlgebricksOptimizationContext implements IOptimizationContext {
private int varCounter;
+ private int rangeIdCounter;
private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
private final PhysicalOptimizationConfig physicalOptimizationConfig;
private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
- Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>();
+ Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
@Override
public void setVariableEvalSize(LogicalVariable var, int size) {
@@ -65,19 +67,19 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
}
};
- private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>();
+ private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
- private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
- private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
- private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+ private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
+ private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
+ private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
private IMetadataProvider metadataProvider;
- private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
+ private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
- protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>();
- protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
+ protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>();
+ protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>();
- protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
+ protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>();
private final IExpressionTypeComputer expressionTypeComputer;
private final IMissableTypeComputer nullableTypeComputer;
private final INodeDomain defaultNodeDomain;
@@ -98,6 +100,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations,
LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
this.varCounter = varCounter;
+ this.rangeIdCounter = -1;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
this.expressionTypeComputer = expressionTypeComputer;
@@ -125,6 +128,13 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
}
@Override
+ public RangeId newRangeId() {
+ rangeIdCounter++;
+ RangeId id = new RangeId(rangeIdCounter);
+ return id;
+ }
+
+ @Override
public IMetadataProvider getMetadataProvider() {
return metadataProvider;
}
@@ -148,7 +158,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
HashSet<ILogicalOperator> operators = dontApply.get(rule);
if (operators == null) {
- HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>();
+ HashSet<ILogicalOperator> os = new HashSet<>();
os.add(op);
dontApply.put(rule, os);
} else {
@@ -164,7 +174,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
if (ops == null) {
- HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>();
+ HashSet<ILogicalOperator> newEntry = new HashSet<>();
newEntry.add(op2);
alreadyCompared.put(op1, newEntry);
return false;
@@ -204,9 +214,9 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
FunctionalDependency fd = varToPrimaryKey.get(recordVar);
if (fd == null) {
- return null;
+ return new ArrayList<>();
}
- return new ArrayList<LogicalVariable>(fd.getHead());
+ return new ArrayList<>(fd.getHead());
}
@Override
@@ -299,7 +309,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
for (Map.Entry<LogicalVariable, FunctionalDependency> me : varToPrimaryKey.entrySet()) {
FunctionalDependency fd = me.getValue();
- List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> hd = new ArrayList<>();
for (LogicalVariable v : fd.getHead()) {
LogicalVariable v2 = mappedVars.get(v);
if (v2 == null) {
@@ -308,7 +318,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
hd.add(v2);
}
}
- List<LogicalVariable> tl = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> tl = new ArrayList<>();
for (LogicalVariable v : fd.getTail()) {
LogicalVariable v2 = mappedVars.get(v);
if (v2 == null) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
index 44e04b5..f6b1796 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.algebricks.data;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
public interface IBinaryComparatorFactoryProvider {
public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 31d1099..4ec5e27 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
@@ -60,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDi
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
@@ -89,8 +91,9 @@ import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
@@ -155,8 +158,8 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
}
// Gets the index of a child to start top-down data property enforcement.
- // If there is a partitioning-compatible child with the operator in opRef, start from this child;
- // otherwise, start from child zero.
+ // If there is a partitioning-compatible child with the operator in opRef,
+ // start from this child; otherwise, start from child zero.
private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
IOptimizationContext context) throws AlgebricksException {
IPhysicalPropertiesVector[] reqdProperties = null;
@@ -273,8 +276,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
changed = true;
addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
- AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex)
- .getValue();
+ AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
if (newChild != child) {
delivered = newChild.getDeliveredPhysicalProperties();
@@ -333,7 +335,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties();
Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild);
@@ -373,14 +375,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList());
break;
}
+ default:
}
}
private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStructuralProperty> reqd,
List<ILocalStructuralProperty> dlvd) {
- List<OrderColumn> returnedProperties = new ArrayList<OrderColumn>();
- List<LogicalVariable> rqdCols = new ArrayList<LogicalVariable>();
- List<LogicalVariable> dlvdCols = new ArrayList<LogicalVariable>();
+ List<OrderColumn> returnedProperties = new ArrayList<>();
+ List<LogicalVariable> rqdCols = new ArrayList<>();
+ List<LogicalVariable> dlvdCols = new ArrayList<>();
for (ILocalStructuralProperty r : reqd) {
r.getVariables(rqdCols);
}
@@ -389,7 +392,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
}
int prefix = dlvdCols.size() - 1;
- for (; prefix >= 0;) {
+ while (prefix >= 0) {
if (!rqdCols.contains(dlvdCols.get(prefix))) {
prefix--;
} else {
@@ -403,7 +406,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder()));
}
// maintain other order columns after the required order columns
- if (returnedProperties.size() != 0) {
+ if (!returnedProperties.isEmpty()) {
for (int j = prefix + 1; j < dlvdCols.size(); j++) {
OrderColumn oc = orderColumns.get(j);
returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
@@ -464,9 +467,9 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
return;
}
- Mutable<ILogicalOperator> topOp = new MutableObject<ILogicalOperator>();
+ Mutable<ILogicalOperator> topOp = new MutableObject<>();
topOp.setValue(op.getInputs().get(i).getValue());
- LinkedList<LocalOrderProperty> oList = new LinkedList<LocalOrderProperty>();
+ LinkedList<LocalOrderProperty> oList = new LinkedList<>();
for (ILocalStructuralProperty prop : localProperties) {
switch (prop.getPropertyType()) {
@@ -478,7 +481,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
LocalGroupingProperty g = (LocalGroupingProperty) prop;
Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null)
? g.getPreferredOrderEnforcer() : g.getColumnSet();
- List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ List<OrderColumn> orderColumns = new ArrayList<>();
for (LogicalVariable v : vars) {
OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
orderColumns.add(oc);
@@ -504,12 +507,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
- throws AlgebricksException {
- List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+ throws AlgebricksException {
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<>();
for (LocalOrderProperty orderProperty : oList) {
for (OrderColumn oc : orderProperty.getOrderColumns()) {
IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
- Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(ordType,
+ Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<>(ordType,
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn())));
oe.add(pair);
}
@@ -537,13 +540,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
switch (pp.getPartitioningType()) {
case UNPARTITIONED: {
List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
- if (ordCols == null || ordCols.size() == 0) {
+ if (ordCols.isEmpty()) {
pop = new RandomMergeExchangePOperator();
} else {
if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
+ RangeId rangeId = context.newRangeId();
IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
.get(OperatorAnnotations.USE_RANGE_CONNECTOR);
- pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap,
+ addRangeForwardOperator(op.getInputs().get(i), rangeId, rangeMap, context);
+ pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeId,
RangePartitioningType.PROJECT);
} else {
OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
@@ -554,8 +559,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
break;
}
case UNORDERED_PARTITIONED: {
- List<LogicalVariable> varList = new ArrayList<LogicalVariable>(
- ((UnorderedPartitionedProperty) pp).getColumnSet());
+ List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
boolean propWasSet = false;
@@ -580,6 +584,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
OrderedPartitionedProperty opp = (OrderedPartitionedProperty) pp;
List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
+
+ // Add RangeForwardOperator.
+ IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
+ .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
+ addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), rangeMap, context);
+
boolean propWasSet = false;
pop = null;
if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
@@ -589,13 +599,13 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals,
cldLocals);
- pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeMap(),
+ pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeId(),
opp.getRangePartitioningType());
propWasSet = true;
}
}
if (!propWasSet) {
- pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeMap(),
+ pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeId(),
opp.getRangePartitioningType());
}
break;
@@ -630,6 +640,21 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
}
}
+ private void addRangeForwardOperator(Mutable<ILogicalOperator> op, RangeId rangeId, IRangeMap rangeMap,
+ IOptimizationContext context) throws AlgebricksException {
+ RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap);
+ RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, rangeMap);
+ rfo.setPhysicalOperator(rfpo);
+ setNewOp(op, rfo, context);
+ rfo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+ OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(rfo, context);
+ context.computeAndSetTypeEnvironmentForOperator(rfo);
+ if (AlgebricksConfig.DEBUG) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added range forward " + rfo.getPhysicalOperator() + ".\n");
+ printOp((AbstractLogicalOperator) op.getValue());
+ }
+ }
+
private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
for (ILocalStructuralProperty lsp : cldLocals) {
if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
@@ -647,17 +672,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
}
private List<OrderColumn> computeOrderColumns(IPhysicalPropertiesVector pv) {
- List<OrderColumn> ordCols = new ArrayList<OrderColumn>();
+ List<OrderColumn> ordCols = new ArrayList<>();
List<ILocalStructuralProperty> localProps = pv.getLocalProperties();
- if (localProps == null || localProps.size() == 0) {
- return null;
+ if (localProps == null || localProps.isEmpty()) {
+ return new ArrayList<>();
} else {
for (ILocalStructuralProperty p : localProps) {
if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
LocalOrderProperty lop = (LocalOrderProperty) p;
ordCols.addAll(lop.getOrderColumns());
} else {
- return null;
+ return new ArrayList<>();
}
}
return ordCols;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index e7cf912..101ec42 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -69,13 +69,13 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
// Map of variables that could be replaced by their producing expression.
// Populated during the top-down sweep of the plan.
- protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<LogicalVariable, ILogicalExpression>();
+ protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
// Visitor for replacing variable reference expressions with their originating expression.
protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
// Set of FunctionIdentifiers that we should not inline.
- protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
+ protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>();
protected boolean hasRun = false;
@@ -127,9 +127,9 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
public static boolean functionIsConstantAtRuntime(AbstractLogicalOperator op,
AbstractFunctionCallExpression funcExpr, IOptimizationContext context) throws AlgebricksException {
//make sure that there are no variables in the expression
- Set<LogicalVariable> usedVariables = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> usedVariables = new HashSet<>();
funcExpr.getUsedVariables(usedVariables);
- if (usedVariables.size() > 0) {
+ if (!usedVariables.isEmpty()) {
return false;
}
return true;
@@ -176,7 +176,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
}
// Variables produced by a nested subplan cannot be inlined
// in operators above the subplan.
- Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> producedVars = new HashSet<>();
VariableUtilities.getProducedVariables(root.getValue(), producedVars);
varAssignRhs.keySet().removeAll(producedVars);
}
@@ -186,7 +186,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
// References to variables generated in the right branch of a left-outer-join cannot be inlined
// in operators above the left-outer-join.
if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
- Set<LogicalVariable> rightLiveVars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> rightLiveVars = new HashSet<>();
VariableUtilities.getLiveVariables(op.getInputs().get(1).getValue(), rightLiveVars);
varAssignRhs.keySet().removeAll(rightLiveVars);
}
@@ -208,8 +208,8 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
- private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
- private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
+ private final Set<LogicalVariable> liveVars = new HashSet<>();
+ private final List<LogicalVariable> rhsUsedVars = new ArrayList<>();
private ILogicalOperator op;
private IOptimizationContext context;
// If set, only replace this variable reference.
@@ -236,54 +236,60 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
ILogicalExpression e = exprRef.getValue();
switch (((AbstractLogicalExpression) e).getExpressionTag()) {
- case VARIABLE: {
- LogicalVariable var = ((VariableReferenceExpression) e).getVariableReference();
- // Restrict replacement to targetVar if it has been set.
- if (targetVar != null && var != targetVar) {
- return false;
- }
+ case VARIABLE:
+ return transformVariable(exprRef);
+ case FUNCTION_CALL:
+ return transformFunction(e);
+ default:
+ return false;
- // Make sure has not been excluded from inlining.
- if (context.shouldNotBeInlined(var)) {
- return false;
- }
+ }
+ }
- ILogicalExpression rhs = varAssignRhs.get(var);
- if (rhs == null) {
- // Variable was not produced by an assign.
- return false;
- }
+ private boolean transformFunction(ILogicalExpression e) throws AlgebricksException {
+ AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
+ boolean modified = false;
+ for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
+ if (transform(arg)) {
+ modified = true;
+ }
+ }
+ return modified;
+ }
- // Make sure used variables from rhs are live.
- if (liveVars.isEmpty()) {
- VariableUtilities.getLiveVariables(op, liveVars);
- }
- rhsUsedVars.clear();
- rhs.getUsedVariables(rhsUsedVars);
- for (LogicalVariable rhsUsedVar : rhsUsedVars) {
- if (!liveVars.contains(rhsUsedVar)) {
- return false;
- }
- }
+ private boolean transformVariable(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ LogicalVariable var = ((VariableReferenceExpression) exprRef.getValue()).getVariableReference();
+ // Restrict replacement to targetVar if it has been set.
+ if (targetVar != null && var != targetVar) {
+ return false;
+ }
- // Replace variable reference with a clone of the rhs expr.
- exprRef.setValue(rhs.cloneExpression());
- return true;
- }
- case FUNCTION_CALL: {
- AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
- boolean modified = false;
- for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
- if (transform(arg)) {
- modified = true;
- }
- }
- return modified;
- }
- default: {
+ // Make sure has not been excluded from inlining.
+ if (context.shouldNotBeInlined(var)) {
+ return false;
+ }
+
+ ILogicalExpression rhs = varAssignRhs.get(var);
+ if (rhs == null) {
+ // Variable was not produced by an assign.
+ return false;
+ }
+
+ // Make sure used variables from rhs are live.
+ if (liveVars.isEmpty()) {
+ VariableUtilities.getLiveVariables(op, liveVars);
+ }
+ rhsUsedVars.clear();
+ rhs.getUsedVariables(rhsUsedVars);
+ for (LogicalVariable rhsUsedVar : rhsUsedVars) {
+ if (!liveVars.contains(rhsUsedVar)) {
return false;
}
}
+
+ // Replace variable reference with a clone of the rhs expr.
+ exprRef.setValue(rhs.cloneExpression());
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 7eb9ac8..a54947a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -184,6 +185,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
}
@Override
+ public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException {
+ return visit(op);
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
return visit(op);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
new file mode 100644
index 0000000..cd14434
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+public interface IRangeMap {
+
+ public int getSplitCount();
+
+ public byte[] getByteArray(int columnIndex, int splitIndex);
+
+ public int getStartOffset(int columnIndex, int splitIndex);
+
+ public int getLength(int columnIndex, int splitIndex);
+
+ public int getTag(int columnIndex, int splitIndex);
+
+ // Min value functions
+ public byte[] getMinByteArray(int columnIndex);
+
+ public int getMinStartOffset(int columnIndex);
+
+ public int getMinLength(int columnIndex);
+
+ public int getMinTag(int columnIndex);
+
+ // Max value functions
+ public byte[] getMaxByteArray(int columnIndex);
+
+ public int getMaxStartOffset(int columnIndex);
+
+ public int getMaxLength(int columnIndex);
+
+ public int getMaxTag(int columnIndex);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
new file mode 100644
index 0000000..abf1495
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+public interface IRangePartitionType {
+ public enum RangePartitioningType {
+ /**
+ * Partitioning is determined by finding the range partition where the first data point lies.
+ */
+ PROJECT,
+ /**
+ * Partitioning is determined by finding the range partition where the last data point lies.
+ */
+ PROJECT_END,
+ /**
+ * Partitioning is determined by finding all the range partitions where the data has a point.
+ */
+ SPLIT,
+ /**
+ * Partitioning is determined by finding all the range partitions where the data has a point
+ * or comes after the data point.
+ */
+ REPLICATE
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
index 5406366..9b3e607 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
@@ -21,5 +21,5 @@ package org.apache.hyracks.api.dataflow.value;
import java.io.Serializable;
public interface ITupleRangePartitionComputerFactory extends Serializable {
- public ITupleRangePartitionComputer createPartitioner();
+ public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index bd7be3f..162d3b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -21,28 +21,27 @@ package org.apache.hyracks.dataflow.common.data.partition.range;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.storage.IGrowableIntArray;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
public class FieldRangePartitionComputerFactory implements ITupleRangePartitionComputerFactory {
private static final long serialVersionUID = 1L;
private final int[] rangeFields;
- private IRangeMap rangeMap;
private IBinaryRangeComparatorFactory[] comparatorFactories;
private RangePartitioningType rangeType;
public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryRangeComparatorFactory[] comparatorFactories,
- IRangeMap rangeMap, RangePartitioningType rangeType) {
+ RangePartitioningType rangeType) {
this.rangeFields = rangeFields;
this.comparatorFactories = comparatorFactories;
- this.rangeMap = rangeMap;
this.rangeType = rangeType;
}
- public ITupleRangePartitionComputer createPartitioner() {
+ public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap) {
final IBinaryComparator[] minComparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
minComparators[i] = comparatorFactories[i].createMinBinaryComparator();
@@ -78,7 +77,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
throws HyracksDataException {
switch (rangeType) {
case PROJECT: {
- int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+ int minPartition = getPartitionMap(
+ binarySearchRangePartition(accessor, tIndex, minComparators));
addPartition(minPartition, map);
break;
}
@@ -89,7 +89,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
break;
}
case REPLICATE: {
- int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+ int minPartition = getPartitionMap(
+ binarySearchRangePartition(accessor, tIndex, minComparators));
int maxPartition = getPartitionMap(rangeMap.getSplitCount() + 1);
for (int pid = minPartition; pid < maxPartition; ++pid) {
addPartition(pid, map);
@@ -97,7 +98,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
break;
}
case SPLIT: {
- int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+ int minPartition = getPartitionMap(
+ binarySearchRangePartition(accessor, tIndex, minComparators));
int maxPartition = getPartitionMap(
binarySearchRangePartition(accessor, tIndex, maxComparators));
for (int pid = minPartition; pid <= maxPartition; ++pid) {