You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:14 UTC
[24/50] [abbrv] asterixdb git commit: Working version of the range
connector and interval join partition.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8701851..6052c69 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -51,6 +51,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
@@ -70,6 +71,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclu
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
@@ -234,6 +236,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
op.setPhysicalOperator(new RunningAggregatePOperator());
break;
}
+ case RANGE_FORWARD: {
+ RangeForwardOperator rfo = (RangeForwardOperator) op;
+ op.setPhysicalOperator(new RangeForwardPOperator(rfo.getRangeId(), rfo.getRangeMap()));
+ break;
+ }
case REPLICATE: {
op.setPhysicalOperator(new ReplicatePOperator());
break;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
index 774dd2a..e1b7b12 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -58,7 +58,7 @@ public final class RangeId implements Serializable {
@Override
public String toString() {
- return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + ")";
+ return "RangeId(" + id + (partition >= 0 ? "," + partition : "") + ")";
}
@Override
@@ -74,4 +74,5 @@ public final class RangeId implements Serializable {
public int hashCode() {
return id;
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index c08035a..c235afb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends AbstractPartitionDataWriter {
@Override
public void open() throws HyracksDataException {
super.open();
- RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
+ RangeForwardTaskState rangeState = RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
tpc = trpcf.createPartitioner(rangeState.getRangeMap());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 04cfca3..0e4bc4d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -73,6 +73,16 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
public IRangeMap getRangeMap() {
return rangeMap;
}
+
+ public static RangeForwardTaskState getRangeState(int rangeId, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ RangeId rangeIdObject = new RangeId(rangeId, ctx);
+ RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeIdObject);
+ if (rangeState == null) {
+ throw new HyracksDataException("Range state is missing for " + rangeIdObject + ".");
+ }
+ return rangeState;
+ }
}
private final class ForwardActivityNode extends AbstractActivityNode {