You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Preston Carman (Code Review)" <do...@asterixdb.incubator.apache.org> on 2015/10/02 02:14:46 UTC
Change in asterixdb[master]: Range connector update and new merge interval join.
Preston Carman has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/424
Change subject: Range connector update and new merge interval join.
......................................................................
Range connector update and new merge interval join.
Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
---
A asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java
M asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
A asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
M asterix-app/data/csv/sample_01.csv
A asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql
A asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql
A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm
A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm
A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm
A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm
A asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm
M asterix-app/src/test/resources/runtimets/testsuite.xml
M asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java
M asterix-aql/src/main/javacc/AQL.jj
A asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
M asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
M asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java
A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java
A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java
A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java
A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java
A asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java
M asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java
A asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java
A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java
A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java
A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java
A asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java
M pom.xml
36 files changed, 1,958 insertions(+), 171 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/424/1
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java
new file mode 100644
index 0000000..ae6d71c
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/interval/SortMergeIntervalJoinPOperator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.operators.physical.interval;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.runtime.operators.interval.SortMergeIntervalJoinOperatorDescriptor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+
+public class SortMergeIntervalJoinPOperator extends AbstractJoinPOperator {
+
+ private final int memSize;
+ protected final List<LogicalVariable> keysLeftBranch;
+ protected final List<LogicalVariable> keysRightBranch;
+ private final IBinaryComparatorFactoryProvider bcfp;
+ private IRangeMap rangeMap;
+
+ public SortMergeIntervalJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize,
+ List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IBinaryComparatorFactoryProvider bcfp,
+ IRangeMap rangeMap) {
+ super(kind, partitioningType);
+ this.memSize = memSize;
+ this.keysLeftBranch = sideLeft;
+ this.keysRightBranch = sideRight;
+ this.bcfp = bcfp;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SORT_MERGE_INTERVAL_JOIN;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+ IPartitioningProperty pp = null;
+ ArrayList<OrderColumn> order = new ArrayList<OrderColumn>();
+ for (LogicalVariable v : keysLeftBranch) {
+ order.add(new OrderColumn(v, OrderKind.ASC));
+ }
+ pp = new OrderedPartitionedProperty(order, null, rangeMap, RangePartitioningType.PROJECT);
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ propsLocal.add(new LocalOrderProperty(order));
+ deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
+ IPhysicalPropertiesVector reqdByParent) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+ AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+ IPartitioningProperty ppLeft = null;
+ List<ILocalStructuralProperty> ispLeft = new ArrayList<ILocalStructuralProperty>();
+ IPartitioningProperty ppRight = null;
+ List<ILocalStructuralProperty> ispRight = new ArrayList<ILocalStructuralProperty>();
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ ArrayList<OrderColumn> orderLeft = new ArrayList<OrderColumn>();
+ for (LogicalVariable v : keysLeftBranch) {
+ orderLeft.add(new OrderColumn(v, OrderKind.ASC));
+ }
+ ppLeft = new OrderedPartitionedProperty(orderLeft, null, rangeMap, RangePartitioningType.PROJECT);
+ ispLeft.add(new LocalOrderProperty(orderLeft));
+
+ ArrayList<OrderColumn> orderRight = new ArrayList<OrderColumn>();
+ for (LogicalVariable v : keysRightBranch) {
+ orderRight.add(new OrderColumn(v, OrderKind.ASC));
+ }
+ ppRight = new OrderedPartitionedProperty(orderRight, null, rangeMap, RangePartitioningType.SPLIT);
+ ispRight.add(new LocalOrderProperty(orderRight));
+ }
+
+ pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);
+ pv[1] = new StructuralPropertiesVector(ppRight, ispRight);
+ IPartitioningRequirementsCoordinator prc = IPartitioningRequirementsCoordinator.NO_COORDINATION;
+ return new PhysicalRequirements(pv, prc);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+ int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+ int i = 0;
+ for (LogicalVariable v : keysLeftBranch) {
+ Object t = env.getVarType(v);
+ comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+ }
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+
+ SortMergeIntervalJoinOperatorDescriptor opDesc = new SortMergeIntervalJoinOperatorDescriptor(spec, memSize,
+ recordDescriptor, keysLeft, keysRight, comparatorFactories);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+}
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index a1c9d8c..940a814 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -289,8 +289,8 @@
//Turned off the following rule for now not to change OptimizerTest results.
//physicalRewritesAllLevels.add(new IntroduceTransactionCommitByAssignOpRule());
physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
- physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+ physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule());
physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule());
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 353c9be..9feaa09 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -33,6 +33,7 @@
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
+import org.apache.asterix.optimizer.rules.util.JoinUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -61,7 +62,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
new file mode 100644
index 0000000..d17e523
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.operators.physical.interval.SortMergeIntervalJoinPOperator;
+import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
+import org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations.AllenRelationsBinaryComparatorFactoryProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class JoinUtils {
+
+ private static final Logger LOGGER = Logger.getLogger(JoinUtils.class.getName());
+
+ public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> sideRight = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();
+ List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();
+ ILogicalExpression conditionLE = op.getCondition().getValue();
+ if (conditionLE.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return;
+ }
+ AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) conditionLE;
+ if (isIntervalJoinCondition(fexp, varsLeft, varsRight, sideLeft, sideRight)) {
+ IntervalJoinExpressionAnnotation ijea = getIntervalJoinAnnotation(fexp);
+ if (ijea == null) {
+ // Use default join method.
+ return;
+ }
+ if (ijea.isMergeJoin()) {
+ // Sort Merge.
+ LOGGER.fine("Interval Join - Merge");
+ setSortMergeIntervalJoinOp(op, sideLeft, sideRight, ijea.getRangeMap(), context);
+ } else if (ijea.isIopJoin()) {
+ // Overlapping Interval Partition.
+ LOGGER.fine("Interval Join - IOP");
+ } else if (ijea.isSpatialJoin()) {
+ // Spatial Partition.
+ LOGGER.fine("Interval Join - Spatial Partitioning");
+ }
+ }
+ }
+
+ private static IntervalJoinExpressionAnnotation getIntervalJoinAnnotation(AbstractFunctionCallExpression fexp) {
+ Iterator<IExpressionAnnotation> annotationIter = fexp.getAnnotations().values().iterator();
+ while (annotationIter.hasNext()) {
+ IExpressionAnnotation annotation = annotationIter.next();
+ if (annotation instanceof IntervalJoinExpressionAnnotation) {
+ return (IntervalJoinExpressionAnnotation) annotation;
+ }
+ }
+ return null;
+ }
+
+ private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, List<LogicalVariable> sideLeft,
+ List<LogicalVariable> sideRight, IRangeMap rangeMap, IOptimizationContext context) {
+ IBinaryComparatorFactoryProvider bcfp = (IBinaryComparatorFactoryProvider) AllenRelationsBinaryComparatorFactoryProvider.INSTANCE;
+ op.setPhysicalOperator(new SortMergeIntervalJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
+ context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), sideLeft, sideRight, bcfp, rangeMap));
+ }
+
+ private static boolean isIntervalJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll,
+ Collection<LogicalVariable> inRightAll, Collection<LogicalVariable> outLeftFields,
+ Collection<LogicalVariable> outRightFields) {
+ switch (e.getExpressionTag()) {
+ case FUNCTION_CALL: {
+ AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;
+ FunctionIdentifier fi = fexp.getFunctionIdentifier();
+ if (!fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) {
+ return false;
+ }
+ ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
+ ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+ if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
+ || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();
+ if (inLeftAll.contains(var1) && !outLeftFields.contains(var1)) {
+ outLeftFields.add(var1);
+ } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {
+ outRightFields.add(var1);
+ } else {
+ return false;
+ }
+ LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();
+ if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) {
+ outLeftFields.add(var2);
+ } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) {
+ outRightFields.add(var2);
+ } else {
+ return false;
+ }
+ return true;
+ }
+ default: {
+ return false;
+ }
+ }
+ }
+}
diff --git a/asterix-app/data/csv/sample_01.csv b/asterix-app/data/csv/sample_01.csv
index 4dd437a..fbba382 100644
--- a/asterix-app/data/csv/sample_01.csv
+++ b/asterix-app/data/csv/sample_01.csv
@@ -1,8 +1,8 @@
-1,0.899682764,5.6256,2013-08-07,07:22:35,1979-02-25T23:48:27.034
-2,0.669052398,,-1923-03-29,19:33:34,-1979-02-25T23:48:27.002
-3,0.572733058,192674,-1923-03-28,19:33:34,-1979-02-25T23:48:27.001
-4,,192674,-1923-03-27,19:33:34,-1979-02-25T23:48:27.001
-5,0.572733058,192674,,19:33:34,-1979-02-25T23:48:27.001
-6,0.572733058,192674,-1923-03-25,,-1979-02-25T23:48:27.001
-7,0.572733058,192674,-1923-03-24,19:33:34,
+1,0.899682764,5.6256,2013-08-07,07:22:35,1979-02-25T23:48:27.034
+2,0.669052398,,-1923-03-29,19:33:34,-1979-02-25T23:48:27.002
+3,0.572733058,192674,-1923-03-28,19:33:34,-1979-02-25T23:48:27.001
+4,,192674,-1923-03-27,19:33:34,-1979-02-25T23:48:27.001
+5,0.572733058,192674,,19:33:34,-1979-02-25T23:48:27.001
+6,0.572733058,192674,-1923-03-25,,-1979-02-25T23:48:27.001
+7,0.572733058,192674,-1923-03-24,19:33:34,
8,,,,,
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml b/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml
new file mode 100644
index 0000000..04a5329
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/TemporalQueries.xml
@@ -0,0 +1,156 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+ <test-case FilePath="temporal">
+ <compilation-unit name="overlap_bins_gby_3">
+ <output-dir compare="Text">overlap_bins_gby_3</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="agg_01">
+ <output-dir compare="Text">agg_01</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="overlap_bins_gby_1">
+ <output-dir compare="Text">overlap_bins_gby_1</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="duration_functions">
+ <output-dir compare="Text">duration_functions</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="overlap_bins_gby_0">
+ <output-dir compare="Text">overlap_bins_gby_0</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="get_overlapping_interval">
+ <output-dir compare="Text">get_overlapping_interval</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="overlap_bins">
+ <output-dir compare="Text">overlap_bins</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="parse_02">
+ <output-dir compare="Text">parse_02</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="parse_01">
+ <output-dir compare="Text">parse_01</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="day_of_week_01">
+ <output-dir compare="Text">day_of_week_01</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="interval_bin">
+ <output-dir compare="Text">interval_bin</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="interval_bin_gby_0">
+ <output-dir compare="Text">interval_bin_gby_0</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="interval_bin_gby_1">
+ <output-dir compare="Text">interval_bin_gby_1</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="accessors">
+ <output-dir compare="Text">accessors</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="accessors_interval">
+ <output-dir compare="Text">accessors_interval</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="accessors_interval_null">
+ <output-dir compare="Text">accessors_interval_null</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="adjust_timezone">
+ <output-dir compare="Text">adjust_timezone</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="calendar_duration">
+ <output-dir compare="Text">calendar_duration</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="date_functions">
+ <output-dir compare="Text">date_functions</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="datetime_functions">
+ <output-dir compare="Text">datetime_functions</output-dir>
+ </compilation-unit>
+ </test-case>
+ <!--
+ <test-case FilePath="temporal">
+ <compilation-unit name="insert_from_delimited_ds">
+ <output-dir compare="Text">insert_from_delimited_ds</output-dir>
+ </compilation-unit>
+ </test-case>
+ -->
+ <test-case FilePath="temporal">
+ <compilation-unit name="insert_from_ext_ds">
+ <output-dir compare="Text">insert_from_ext_ds</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="insert_from_ext_ds_2">
+ <output-dir compare="Text">insert_from_ext_ds_2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="interval_functions">
+ <output-dir compare="Text">interval_functions</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal/interval_joins">
+ <compilation-unit name="interval_overlaps">
+ <output-dir compare="Text">interval_overlaps</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="time_functions">
+ <output-dir compare="Text">time_functions</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="constructor">
+ <compilation-unit name="interval">
+ <output-dir compare="Text">interval</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="temporal">
+ <compilation-unit name="duration_comps">
+ <output-dir compare="Text">duration_comps</output-dir>
+ </compilation-unit>
+ </test-case>
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql
new file mode 100644
index 0000000..f494ebe
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.1.ddl.aql
@@ -0,0 +1,26 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+drop dataverse TinyCollege if exists;
+create dataverse TinyCollege;
+use dataverse TinyCollege;
+
+create type StaffType as open {
+ name: string,
+ office: string,
+ employment: interval
+}
+create dataset Staff(StaffType)
+primary key name;
+
+
+create type StudentType as open {
+ name: string,
+ office: string,
+ attendance: interval
+}
+create dataset Students(StudentType)
+primary key name;
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql
new file mode 100644
index 0000000..0b7b8fa
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.2.update.aql
@@ -0,0 +1,22 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+insert into dataset Staff ({"name":"Alex", "office":"A", "employment":interval-from-date(date("2003-01-01"), date("2008-01-01"))});
+insert into dataset Staff ({"name":"Elisabeth", "office":"B", "employment":interval-from-date(date("2002-01-01"), date("2010-01-01"))});
+insert into dataset Staff ({"name":"Frank", "office":"A", "employment":interval-from-date(date("2004-01-01"), date("2009-01-01"))});
+insert into dataset Staff ({"name":"Henry", "office":"C", "employment":interval-from-date(date("2003-01-01"), date("2008-01-01"))});
+insert into dataset Staff ({"name":"Mary", "office":"B", "employment":interval-from-date(date("2006-01-01"), date("2010-01-01"))});
+insert into dataset Staff ({"name":"Vicky", "office":"D", "employment":interval-from-date(date("2001-01-01"), date("2010-01-01"))});
+
+insert into dataset Students ({"name":"Charles", "office":"X", "attendance":interval-from-date(date("2001-01-01"), date("2004-01-01"))});
+insert into dataset Students ({"name":"Frank", "office":"Y", "attendance":interval-from-date(date("2001-01-01"), date("2004-01-01"))});
+insert into dataset Students ({"name":"Karen", "office":"Y", "attendance":interval-from-date(date("2007-01-01"), date("2009-01-01"))});
+insert into dataset Students ({"name":"Mary", "office":"Y", "attendance":interval-from-date(date("2002-01-01"), date("2005-01-01"))});
+insert into dataset Students ({"name":"Olga", "office":"Z", "attendance":interval-from-date(date("2001-01-01"), date("2003-01-01"))});
+insert into dataset Students ({"name":"Steve", "office":"Z", "attendance":interval-from-date(date("2007-01-01"), date("2010-01-01"))});
+
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql
new file mode 100644
index 0000000..c8b2d5e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+for $f in dataset Staff
+for $d in dataset Students
+where interval-overlaps($f.employment, $d.attendance)
+/*+ range ["F", "L", "R"] */
+order by $f.name, $d.name
+return { "staff" : $f.name, "student" : $d.name }
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql
new file mode 100644
index 0000000..22e080b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.4.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+for $f in dataset Staff
+for $d in dataset Students
+where /*+ interval-merge-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance)
+/*+ range ["F", "L", "R"] */
+order by $f.name, $d.name
+return { "staff" : $f.name, "student" : $d.name }
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql
new file mode 100644
index 0000000..eb5c5c1
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.5.query.aql
@@ -0,0 +1,16 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+/*
+for $f in dataset Staff
+for $d in dataset Students
+where /*+ interval-iop-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance)
+/*+ range ["F", "L", "R"] */
+order by $f.name, $d.name
+return { "staff" : $f.name, "student" : $d.name }
+*/
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql
new file mode 100644
index 0000000..86a2587
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.6.query.aql
@@ -0,0 +1,16 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+/*
+for $f in dataset Staff
+for $d in dataset Students
+where /*+ interval-spatial-join [991353600000, 1054425600000, 1149120000000] */ interval-overlaps($f.employment, $d.attendance)
+/*+ range ["F", "L", "R"] */
+order by $f.name, $d.name
+return { "staff" : $f.name, "student" : $d.name }
+*/
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql
new file mode 100644
index 0000000..f8da7f6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlaps/interval_overlaps.7.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Check temporal join functionality for interval
+ * Expected Result : Success
+ * Date : 26th Jun, 2015
+ */
+
+use dataverse TinyCollege;
+
+for $f in dataset Staff
+for $d in dataset Students
+where interval-overlaps($d.attendance, $f.employment)
+/*+ range ["F", "L", "R"] */
+order by $f.name, $d.name
+return { "staff" : $f.name, "student" : $d.name }
diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm
new file mode 100644
index 0000000..a9bd2e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.3.adm
@@ -0,0 +1,6 @@
+[ { "staff": "Alex", "student": "Karen" }
+, { "staff": "Alex", "student": "Steve" }
+, { "staff": "Frank", "student": "Steve" }
+, { "staff": "Henry", "student": "Karen" }
+, { "staff": "Henry", "student": "Steve" }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm
new file mode 100644
index 0000000..a9bd2e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.4.adm
@@ -0,0 +1,6 @@
+[ { "staff": "Alex", "student": "Karen" }
+, { "staff": "Alex", "student": "Steve" }
+, { "staff": "Frank", "student": "Steve" }
+, { "staff": "Henry", "student": "Karen" }
+, { "staff": "Henry", "student": "Steve" }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm
new file mode 100644
index 0000000..a9bd2e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.5.adm
@@ -0,0 +1,6 @@
+[ { "staff": "Alex", "student": "Karen" }
+, { "staff": "Alex", "student": "Steve" }
+, { "staff": "Frank", "student": "Steve" }
+, { "staff": "Henry", "student": "Karen" }
+, { "staff": "Henry", "student": "Steve" }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm
new file mode 100644
index 0000000..a9bd2e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.6.adm
@@ -0,0 +1,6 @@
+[ { "staff": "Alex", "student": "Karen" }
+, { "staff": "Alex", "student": "Steve" }
+, { "staff": "Frank", "student": "Steve" }
+, { "staff": "Henry", "student": "Karen" }
+, { "staff": "Henry", "student": "Steve" }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm
new file mode 100644
index 0000000..65a07bb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/temporal/interval_joins/interval_overlaps/interval_overlaps.7.adm
@@ -0,0 +1,11 @@
+[ { "staff": "Alex", "student": "Charles" }
+, { "staff": "Alex", "student": "Frank" }
+, { "staff": "Alex", "student": "Mary" }
+, { "staff": "Elisabeth", "student": "Charles" }
+, { "staff": "Elisabeth", "student": "Frank" }
+, { "staff": "Elisabeth", "student": "Olga" }
+, { "staff": "Frank", "student": "Mary" }
+, { "staff": "Henry", "student": "Charles" }
+, { "staff": "Henry", "student": "Frank" }
+, { "staff": "Henry", "student": "Mary" }
+ ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index d706feb..6e92521 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -19,6 +19,7 @@
<!DOCTYPE test-suite [
<!ENTITY RecordsQueries SYSTEM "queries/records/RecordsQueries.xml">
+<!ENTITY TemporalQueries SYSTEM "queries/temporal/TemporalQueries.xml">
]>
<test-suite
@@ -6069,7 +6070,7 @@
<output-dir compare="Text">feeds_04</output-dir>
</compilation-unit>
</test-case>
-
+
<test-case FilePath="feeds">
<compilation-unit name="feeds_06">
<output-dir compare="Text">feeds_06</output-dir>
@@ -6108,7 +6109,7 @@
<output-dir compare="Text">feeds_12</output-dir>
</compilation-unit>
</test-case>
-
+
<test-case FilePath="feeds">
<compilation-unit name="issue_230_feeds">
<output-dir compare="Text">issue_230_feeds</output-dir>
@@ -6124,7 +6125,7 @@
</test-group>
<test-group name="hdfs">
- <test-case FilePath="hdfs">
+ <test-case FilePath="hdfs">
<compilation-unit name="hdfs_shortcircuit">
<output-dir compare="Text">hdfs_shortcircuit</output-dir>
</compilation-unit>
@@ -6200,143 +6201,7 @@
</test-case>
</test-group>
<test-group name="temporal">
- <test-case FilePath="temporal">
- <compilation-unit name="overlap_bins_gby_3">
- <output-dir compare="Text">overlap_bins_gby_3</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="agg_01">
- <output-dir compare="Text">agg_01</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="overlap_bins_gby_1">
- <output-dir compare="Text">overlap_bins_gby_1</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="duration_functions">
- <output-dir compare="Text">duration_functions</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="overlap_bins_gby_0">
- <output-dir compare="Text">overlap_bins_gby_0</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="get_overlapping_interval">
- <output-dir compare="Text">get_overlapping_interval</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="overlap_bins">
- <output-dir compare="Text">overlap_bins</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="parse_02">
- <output-dir compare="Text">parse_02</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="parse_01">
- <output-dir compare="Text">parse_01</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="day_of_week_01">
- <output-dir compare="Text">day_of_week_01</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="interval_bin">
- <output-dir compare="Text">interval_bin</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="interval_bin_gby_0">
- <output-dir compare="Text">interval_bin_gby_0</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="interval_bin_gby_1">
- <output-dir compare="Text">interval_bin_gby_1</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="accessors">
- <output-dir compare="Text">accessors</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="accessors_interval">
- <output-dir compare="Text">accessors_interval</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="accessors_interval_null">
- <output-dir compare="Text">accessors_interval_null</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="adjust_timezone">
- <output-dir compare="Text">adjust_timezone</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="calendar_duration">
- <output-dir compare="Text">calendar_duration</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="date_functions">
- <output-dir compare="Text">date_functions</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="datetime_functions">
- <output-dir compare="Text">datetime_functions</output-dir>
- </compilation-unit>
- </test-case>
- <!--
- <test-case FilePath="temporal">
- <compilation-unit name="insert_from_delimited_ds">
- <output-dir compare="Text">insert_from_delimited_ds</output-dir>
- </compilation-unit>
- </test-case>
- -->
- <test-case FilePath="temporal">
- <compilation-unit name="insert_from_ext_ds">
- <output-dir compare="Text">insert_from_ext_ds</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="insert_from_ext_ds_2">
- <output-dir compare="Text">insert_from_ext_ds_2</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="interval_functions">
- <output-dir compare="Text">interval_functions</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="time_functions">
- <output-dir compare="Text">time_functions</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="constructor">
- <compilation-unit name="interval">
- <output-dir compare="Text">interval</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="temporal">
- <compilation-unit name="duration_comps">
- <output-dir compare="Text">duration_comps</output-dir>
- </compilation-unit>
- </test-case>
+ &TemporalQueries;
</test-group>
<test-group name="leftouterjoin">
<test-case FilePath="leftouterjoin">
diff --git a/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java b/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java
index b18c317..cf20a48 100644
--- a/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java
+++ b/asterix-aql/src/main/java/org/apache/asterix/aql/util/RangeMapBuilder.java
@@ -63,7 +63,9 @@
AQLParser parser = new AQLParser((String) hint);
List<Statement> hintStatements = parser.parse();
- if (hintStatements.size() != 1) {
+ if (hintStatements.size() == 0) {
+ throw new ParseException("No range hint was supplied to the RangeMapBuilder.");
+ } else if (hintStatements.size() != 1) {
throw new ParseException("Only one range statement is allowed for the range hint.");
}
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 0aab549..f1b1b85 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -116,6 +116,7 @@
import org.apache.asterix.common.annotations.FieldValFileSameIndexDataGen;
import org.apache.asterix.common.annotations.IRecordFieldDataGen;
import org.apache.asterix.common.annotations.InsertRandIntDataGen;
+import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
import org.apache.asterix.common.annotations.ListDataGen;
import org.apache.asterix.common.annotations.ListValFileDataGen;
import org.apache.asterix.common.annotations.SkipSecondaryIndexSearchExpressionAnnotation;
@@ -146,7 +147,7 @@
private static final String DATETIME_ADD_RAND_HOURS_HINT = "datetime-add-rand-hours";
private static final String DATETIME_BETWEEN_YEARS_HINT = "datetime-between-years";
private static final String HASH_GROUP_BY_HINT = "hash";
- private static final String INDEXED_NESTED_LOOP_JOIN_HINT = "indexnl";
+ private static final String INDEXED_NESTED_LOOP_JOIN_HINT = IndexedNLJoinExpressionAnnotation.HINT_STRING;
private static final String INMEMORY_HINT = "inmem";
private static final String INSERT_RAND_INT_HINT = "insert-rand-int";
private static final String INTERVAL_HINT = "interval";
@@ -442,7 +443,7 @@
| ("internal" | "temporary" {
temp = token.image.toLowerCase().equals("temporary");
}
- )?
+ )?
<DATASET> nameComponents = QualifiedName()
<LEFTPAREN> typeName = Identifier() <RIGHTPAREN>
ifNotExists = IfNotExists()
@@ -656,7 +657,7 @@
FunctionSignature appliedFunction = null;
CreateFeedStatement cfs = null;
Pair<Identifier,Identifier> sourceNameComponents = null;
-
+
}
{
(
@@ -668,7 +669,7 @@
}
|
("primary")? "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists()
- "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?
+ "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?
{
cfs = new CreatePrimaryFeedStatement(nameComponents,
adapterName, properties, appliedFunction, ifNotExists);
@@ -681,8 +682,8 @@
CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException:
{
- String policyName = null;
- String basePolicyName = null;
+ String policyName = null;
+ String basePolicyName = null;
String sourcePolicyFile = null;
String definition = null;
boolean ifNotExists = false;
@@ -692,18 +693,18 @@
{
(
"ingestion" "policy" policyName = Identifier() ifNotExists = IfNotExists()
- <FROM>
- ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())?
+ <FROM>
+ ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())?
{
cfps = new CreateFeedPolicyStatement(policyName,
basePolicyName, properties, definition, ifNotExists);
}
- | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())?
+ | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())?
{
cfps = new CreateFeedPolicyStatement(policyName, sourcePolicyFile, definition, ifNotExists);
}
- )
-
+ )
+
)
{
return cfps;
@@ -2038,7 +2039,13 @@
}
callExpr = new CallExpr(signature,argList);
if (hint != null) {
- if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
+ if (IntervalJoinExpressionAnnotation.isIntervalJoinHint(hint)) {
+ IntervalJoinExpressionAnnotation ijea = IntervalJoinExpressionAnnotation.INSTANCE;
+ ijea.setObject(hint);
+ ijea.setJoinType(hint);
+ ijea.setRangeMap(RangeMapBuilder.parseHint(hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint))));
+ callExpr.addHint(ijea);
+ } else if (hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE);
} else if (hint.startsWith(SKIP_SECONDARY_INDEX_SEARCH_HINT)) {
callExpr.addHint(SkipSecondaryIndexSearchExpressionAnnotation.INSTANCE);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java b/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
new file mode 100644
index 0000000..de1ee1b
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.annotations;
+
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
+
+ public static final String IOP_HINT_STRING = "interval-iop-join";
+ public static final String MERGE_HINT_STRING = "interval-merge-join";
+ public static final String SPATIAL_HINT_STRING = "interval-spatial-join";
+ public static final IntervalJoinExpressionAnnotation INSTANCE = new IntervalJoinExpressionAnnotation();
+
+ private Object object;
+ private IRangeMap map;
+ private String joinType;
+
+ @Override
+ public Object getObject() {
+ return object;
+ }
+
+ @Override
+ public void setObject(Object object) {
+ this.object = object;
+ }
+
+ @Override
+ public IExpressionAnnotation copy() {
+ IntervalJoinExpressionAnnotation clone = new IntervalJoinExpressionAnnotation();
+ clone.setObject(object);
+ return clone;
+ }
+
+ public void setRangeMap(IRangeMap map) {
+ this.map = map;
+ }
+
+ public IRangeMap getRangeMap() {
+ return map;
+ }
+
+ public void setJoinType(String hint) {
+ if (hint.startsWith(IOP_HINT_STRING)) {
+ joinType = IOP_HINT_STRING;
+ } else if (hint.startsWith(MERGE_HINT_STRING)) {
+ joinType = MERGE_HINT_STRING;
+ } else if (hint.startsWith(SPATIAL_HINT_STRING)) {
+ joinType = SPATIAL_HINT_STRING;
+ }
+ }
+
+ public String getRangeType() {
+ return joinType;
+ }
+
+ public boolean isIopJoin() {
+ if (joinType.equals(IOP_HINT_STRING)) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isMergeJoin() {
+ if (joinType.equals(MERGE_HINT_STRING)) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isSpatialJoin() {
+ if (joinType.equals(SPATIAL_HINT_STRING)) {
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean isIntervalJoinHint(String hint) {
+ if (hint.startsWith(IOP_HINT_STRING) || hint.startsWith(MERGE_HINT_STRING)
+ || hint.startsWith(SPATIAL_HINT_STRING)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public static int getHintLength(String hint) {
+ if (hint.startsWith(IOP_HINT_STRING)) {
+ return IOP_HINT_STRING.length();
+ } else if (hint.startsWith(MERGE_HINT_STRING)) {
+ return MERGE_HINT_STRING.length();
+ } else if (hint.startsWith(SPATIAL_HINT_STRING)) {
+ return SPATIAL_HINT_STRING.length();
+ }
+ return 0;
+ }
+
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java b/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
index d6da48c..519458e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
@@ -22,7 +22,7 @@
public class SkipSecondaryIndexSearchExpressionAnnotation implements IExpressionAnnotation {
- public static final String SKIP_SECONDARY_INDEX_SEARCH_ANNOTATION_KEY = "skip-index";
+ public static final String HINT_STRING = "skip-index";
public static final SkipSecondaryIndexSearchExpressionAnnotation INSTANCE = new SkipSecondaryIndexSearchExpressionAnnotation();
private Object object;
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java
index f7c4428..aa38010 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AIntervalPartialBinaryComparatorFactory.java
@@ -42,19 +42,20 @@
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int c = Double.compare(
+ int c = Long.compare(
AInt64SerializerDeserializer.getLong(b1,
s1 + AIntervalSerializerDeserializer.getIntervalStartOffset()),
AInt64SerializerDeserializer.getLong(b2,
s2 + AIntervalSerializerDeserializer.getIntervalStartOffset()));
if (c == 0) {
- c = Double.compare(
+ c = Long.compare(
AInt64SerializerDeserializer.getLong(b1,
s1 + AIntervalSerializerDeserializer.getIntervalEndOffset()),
AInt64SerializerDeserializer.getLong(b2,
s2 + AIntervalSerializerDeserializer.getIntervalEndOffset()));
if (c == 0) {
- c = Byte.compare(b1[s1 + 16], b2[s2 + 16]);
+ c = Byte.compare(b1[s1 + AIntervalSerializerDeserializer.getIntervalTagOffset()], b2[s2
+ + AIntervalSerializerDeserializer.getIntervalTagOffset()]);
}
}
return c;
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java
new file mode 100644
index 0000000..564a793
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/AllenRelationsBinaryComparatorFactoryProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class AllenRelationsBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ public static final AllenRelationsBinaryComparatorFactoryProvider INSTANCE = new AllenRelationsBinaryComparatorFactoryProvider();
+
+ private AllenRelationsBinaryComparatorFactoryProvider() {
+ }
+
+ @Override
+ public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) {
+ // During a comparison, since proper type promotion among several numeric types are required,
+ // we will use AObjectAscBinaryComparatorFactory, instead of using a specific comparator
+ return OverlapIntervalBinaryComparatorFactory.INSTANCE;
+ }
+
+ public IBinaryComparatorFactory getBinaryComparatorFactory(FunctionIdentifier fid, boolean ascending) {
+ return OverlapIntervalBinaryComparatorFactory.INSTANCE;
+ }
+
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java
new file mode 100644
index 0000000..f0443b7
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/allenrelations/OverlapIntervalBinaryComparatorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.allenrelations;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class OverlapIntervalBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final OverlapIntervalBinaryComparatorFactory INSTANCE = new OverlapIntervalBinaryComparatorFactory();
+
+ private OverlapIntervalBinaryComparatorFactory() {
+
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ long start0 = AIntervalSerializerDeserializer.getIntervalStart(b1, s1);
+ long end0 = AIntervalSerializerDeserializer.getIntervalEnd(b1, s1);
+ long start1 = AIntervalSerializerDeserializer.getIntervalStart(b2, s2);
+ long end1 = AIntervalSerializerDeserializer.getIntervalEnd(b2, s2);
+
+ if (start0 < start1 && end0 > start1 && end1 > end0) {
+ // These intervals overlap
+ return 0;
+ }
+ if (end0 < start1) {
+ return 1;
+ }
+ return -1;
+ }
+ };
+ }
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java
new file mode 100644
index 0000000..7a358f2
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalProjectBinaryComparatorFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RangeIntervalProjectBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final RangeIntervalProjectBinaryComparatorFactory INSTANCE = new RangeIntervalProjectBinaryComparatorFactory();
+
+ private RangeIntervalProjectBinaryComparatorFactory() {
+
+ }
+
+ /*
+ * The comparator uses the range map split value and an interval.
+ *
+ * -1: split point is less than the interval start point.
+ * 0: split point is equal to the interval start point
+ * 1: split point is greater than the interval start point.
+ */
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return Long.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer
+ .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset()));
+ }
+ };
+ }
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java
new file mode 100644
index 0000000..19fce56
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalReplicateBinaryComparatorFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RangeIntervalReplicateBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final RangeIntervalReplicateBinaryComparatorFactory INSTANCE = new RangeIntervalReplicateBinaryComparatorFactory();
+
+ private RangeIntervalReplicateBinaryComparatorFactory() {
+
+ }
+
+ /*
+ * The comparator uses the range map split value and an interval.
+ *
+ * -1: split point is less than the interval start point.
+ * 0: split point is equal to or greater than the interval start point
+ * 1: never happens
+ */
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int c = Long.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer
+ .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset()));
+ if (c > 0) {
+ c = 0;
+ }
+ return c;
+ }
+ };
+ }
+
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java
new file mode 100644
index 0000000..8fc2b54
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/rangeinterval/RangeIntervalSplitBinaryComparatorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators.rangeinterval;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RangeIntervalSplitBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final RangeIntervalSplitBinaryComparatorFactory INSTANCE = new RangeIntervalSplitBinaryComparatorFactory();
+
+ private RangeIntervalSplitBinaryComparatorFactory() {
+
+ }
+
+ /*
+ * The comparator uses the range map split value and an interval.
+ *
+ * -1: split point is less than the interval start point.
+ * 0: split point is in the interval
+ * 1: split point is greater than the interval end point.
+ */
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int c = Double.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer
+ .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalStartOffset()));
+ if (c > 0) {
+ c = Double.compare(AInt64SerializerDeserializer.getLong(b1, s1), AInt64SerializerDeserializer
+ .getLong(b2, s2 + AIntervalSerializerDeserializer.getIntervalEndOffset()));
+ if (c < 0) {
+ c = 0;
+ }
+ }
+ return c;
+ }
+ };
+ }
+
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java
index 5300b28..d2735b4 100644
--- a/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java
+++ b/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AIntervalSerializerDeserializer.java
@@ -85,13 +85,17 @@
return 8;
}
+ public static int getIntervalTagOffset() {
+ return 16;
+ }
+
public static byte getIntervalTimeType(byte[] data, int offset) {
return data[offset + 8 * 2];
}
/**
* create an interval value from two given datetime instance.
- *
+ *
* @param interval
* @param out
* @throws HyracksDataException
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java
new file mode 100644
index 0000000..73af3d2
--- /dev/null
+++ b/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AIntervalPointable.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.pointables.nonvisitor;
+
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.data.std.api.AbstractPointable;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
+import org.apache.hyracks.data.std.primitive.BytePointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+/*
+ * This class interprets the binary data representation of an interval.
+ *
+ * The interval can be time, date, or datetime defined by the tag.
+ *
+ * Interval {
+ * int startPoint;
+ * int endPoint;
+ * byte tag;
+ * }
+ */
+public class AIntervalPointable extends AbstractPointable {
+
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return true;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return 17;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new AIntervalPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+ public static final IObjectFactory<IPointable, String> ALLOCATOR = new IObjectFactory<IPointable, String>() {
+ public IPointable create(String id) {
+ return new AIntervalPointable();
+ }
+ };
+
+ private static final int TAG_SIZE = 1;
+ private static final int START_LENGTH_SIZE = 8;
+ private static final int END_LENGTH_SIZE = 8;
+
+ public long getStart() {
+ return LongPointable.getLong(bytes, getStartOffset());
+ }
+
+ public int getStartOffset() {
+ return start;
+ }
+
+ public int getStartSize() {
+ return START_LENGTH_SIZE;
+ }
+
+ public long getEnd() {
+ return LongPointable.getLong(bytes, getEndOffset());
+ }
+
+ public int getEndOffset() {
+ return getStartOffset() + getStartSize();
+ }
+
+ public int getEndSize() {
+ return END_LENGTH_SIZE;
+ }
+
+ public byte getTag() {
+ return BytePointable.getByte(bytes, getTagOffset());
+ }
+
+ public int getTagOffset() {
+ return getEndOffset() + getEndSize();
+ }
+
+ public int getTagSize() {
+ return TAG_SIZE;
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java
new file mode 100644
index 0000000..f23bd55
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinLocks.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.interval;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SortMergeIntervalJoinLocks implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Lock> lock = new ArrayList<Lock>();
+ private final List<Condition> left = new ArrayList<Condition>();
+ private final List<Condition> right = new ArrayList<Condition>();
+
+ public synchronized void setPartitions(int partitions) {
+ for (int i = lock.size(); i < partitions; ++i) {
+ lock.add(new ReentrantLock());
+ left.add(lock.get(i).newCondition());
+ right.add(lock.get(i).newCondition());
+ }
+ }
+
+ public Lock getLock(int partition) {
+ return lock.get(partition);
+ }
+
+ public Condition getLeft(int partition) {
+ return left.get(partition);
+ }
+
+ public Condition getRight(int partition) {
+ return right.get(partition);
+ }
+}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java
new file mode 100644
index 0000000..de57f3f
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoinOperatorDescriptor.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.interval;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class SortMergeIntervalJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final int LEFT_ACTIVITY_ID = 0;
+ private static final int RIGHT_ACTIVITY_ID = 1;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final int memSize;
+
+ public SortMergeIntervalJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSize,
+ RecordDescriptor recordDescriptor, int[] keys0, int[] keys1,
+ IBinaryComparatorFactory[] comparatorFactories) {
+ super(spec, 2, 1);
+ recordDescriptors[0] = recordDescriptor;
+ this.comparatorFactories = comparatorFactories;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.memSize = memSize;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SortMergeIntervalJoinLocks locks = new SortMergeIntervalJoinLocks();
+ ActivityId p1Aid = new ActivityId(odId, LEFT_ACTIVITY_ID);
+ ActivityId p2Aid = new ActivityId(odId, RIGHT_ACTIVITY_ID);
+ LeftActivityNode phase1 = new LeftActivityNode(p1Aid, p2Aid, locks);
+ RightActivityNode phase2 = new RightActivityNode(p2Aid, p1Aid, locks);
+
+ builder.addActivity(this, phase1);
+ builder.addSourceEdge(1, phase1, 0);
+
+ builder.addActivity(this, phase2);
+ builder.addSourceEdge(0, phase2, 0);
+
+ builder.addTargetEdge(0, phase2, 0);
+ }
+
+ public static class SortMergeIntervalJoinTaskState extends AbstractStateObject {
+ private SortMergeIntervalStatus status;
+ private SortMergeIntervalJoiner joiner;
+ private boolean failed;
+
+ private SortMergeIntervalJoinTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ status = new SortMergeIntervalStatus();
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ }
+ }
+
+ private class LeftActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId joinAid;
+ private final SortMergeIntervalJoinLocks locks;
+
+ public LeftActivityNode(ActivityId id, ActivityId joinAid, SortMergeIntervalJoinLocks locks) {
+ super(id);
+ this.joinAid = joinAid;
+ this.locks = locks;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ locks.setPartitions(nPartitions);
+ final RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ return new LeftOperator(ctx, partition, inRecordDesc, comparators);
+ }
+
+ private class LeftOperator extends AbstractUnaryOutputOperatorNodePushable {
+
+ private final IHyracksTaskContext ctx;
+ private final int partition;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor leftRD;
+
+ public LeftOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc,
+ IBinaryComparator[] comparators) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.leftRD = inRecordDesc;
+ this.comparators = comparators;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ private SortMergeIntervalJoinTaskState state;
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ writer.open();
+ state = new SortMergeIntervalJoinTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
+ state.status.openLeft();
+ state.joiner = new SortMergeIntervalJoiner(ctx, memSize, partition, state.status, locks,
+ new FrameTuplePairComparator(keys0, keys1, comparators), writer, leftRD);
+ locks.getRight(partition).signal();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ locks.getLock(partition).lock();
+ if (first) {
+ state.status.dataLeft();
+ first = false;
+ }
+ try {
+ state.joiner.setLeftFrame(buffer);
+ state.joiner.processMerge();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.failed = true;
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.status.leftHasMore = false;
+ if (state.failed) {
+ writer.fail();
+ } else {
+ state.joiner.processMerge();
+ writer.close();
+ }
+ state.status.closeLeft();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+ };
+ }
+ }
+ }
+
+ private class RightActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId joinAid;
+ private SortMergeIntervalJoinLocks locks;
+
+ public RightActivityNode(ActivityId id, ActivityId joinAid, SortMergeIntervalJoinLocks locks) {
+ super(id);
+ this.joinAid = joinAid;
+ this.locks = locks;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ locks.setPartitions(nPartitions);
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ return new RightOperator(ctx, partition, inRecordDesc);
+ }
+
+ private class RightOperator extends AbstractUnaryOutputOperatorNodePushable {
+
+ private int partition;
+ private IHyracksTaskContext ctx;
+ private final RecordDescriptor rightRD;
+
+ public RightOperator(IHyracksTaskContext ctx, int partition, RecordDescriptor inRecordDesc) {
+ this.ctx = ctx;
+ this.partition = partition;
+ this.rightRD = inRecordDesc;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ private SortMergeIntervalJoinTaskState state;
+ private boolean first = true;
+
+ @Override
+ public void open() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ do {
+ // Wait for the state to be set in the context form Left.
+ state = (SortMergeIntervalJoinTaskState) ctx
+ .getStateObject(new TaskId(joinAid, partition));
+ if (state == null) {
+ locks.getRight(partition).await();
+ }
+ } while (state == null);
+ state.joiner.setRightRecordDescriptor(rightRD);
+ state.status.openRight();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException("RightOperator interrupted exceptrion", e);
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ locks.getLock(partition).lock();
+ if (first) {
+ state.status.dataRight();
+ first = false;
+ }
+ try {
+ while (state.status.loadRightFrame == false) {
+ // Wait for the state to request right frame.
+ locks.getRight(partition).await();
+ };
+ state.joiner.setRightFrame(buffer);
+ locks.getLeft(partition).signal();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException("RightOperator interrupted exceptrion", e);
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.failed = true;
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ locks.getLock(partition).lock();
+ try {
+ state.status.closeRight();
+ } finally {
+ locks.getLock(partition).unlock();
+ }
+ }
+ };
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java
new file mode 100644
index 0000000..b5cdd19
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalJoiner.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.interval;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.asterix.runtime.operators.interval.SortMergeIntervalStatus.BranchStatus;
+import org.apache.asterix.runtime.operators.interval.SortMergeIntervalStatus.RunFileStatus;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.IFramePool;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameTupleBufferAccessor;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.ITupleBufferManager;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableTupleMemoryManager;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public class SortMergeIntervalJoiner {
+
+ private static final int MEMORY_INDEX = -1;
+
+ private final FrameTupleAccessor accessorLeft;
+ private FrameTupleAccessor accessorRight;
+
+ private SortMergeIntervalJoinLocks locks;
+ private SortMergeIntervalStatus status;
+
+ private final IFrameWriter writer;
+
+ private ByteBuffer leftBuffer;
+ private ByteBuffer rightBuffer;
+ private int leftTupleIndex;
+ private int rightRunFileTupleIndex;
+ private int rightBufferTupleIndex;
+
+ private final ITupleBufferManager bufferManager;
+ private final List<TuplePointer> memoryTuples;
+ private final IFrameTupleBufferAccessor memoryAccessor;
+
+ private final IFrame runFileBuffer;
+ private final FrameTupleAppender runFileAppender;
+ private final RunFileWriter runFileWriter;
+ private RunFileReader runFileReader;
+ private IFrameTupleAccessor runFileAccessor;
+
+ private final FrameTupleAppender resultAppender;
+
+ private final FrameTuplePairComparator comparator;
+
+ private final int partition;
+
+ public SortMergeIntervalJoiner(IHyracksTaskContext ctx, int memorySize, int partition,
+ SortMergeIntervalStatus status, SortMergeIntervalJoinLocks locks, FrameTuplePairComparator comparator,
+ IFrameWriter writer, RecordDescriptor leftRd) throws HyracksDataException {
+ this.partition = partition;
+ this.status = status;
+ this.locks = locks;
+ this.writer = writer;
+ this.comparator = comparator;
+
+ accessorLeft = new FrameTupleAccessor(leftRd);
+
+ // Memory
+ IFramePool framePool = new VariableFramePool(ctx, (memorySize - 1) * ctx.getInitialFrameSize());
+ bufferManager = new VariableTupleMemoryManager(framePool, leftRd);
+ memoryTuples = new ArrayList<TuplePointer>();
+ memoryAccessor = bufferManager.getFrameTupleAccessor();
+
+ // Run File and frame cache
+ FileReference file = ctx.getJobletContext()
+ .createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ runFileBuffer = new FixedSizeFrame(ctx.allocateFrame(ctx.getInitialFrameSize()));
+ runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+
+ // Result
+ resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+ }
+
+ private boolean addToMemory(IFrameTupleAccessor accessor, int idx) throws HyracksDataException {
+ TuplePointer tuplePointer = new TuplePointer();
+ if (bufferManager.insertTuple(accessor, idx, tuplePointer)) {
+ memoryTuples.add(tuplePointer);
+ return true;
+ }
+ return false;
+ }
+
+ private void addToResult(IFrameTupleAccessor accessor1, int index1, IFrameTupleAccessor accessor2, int index2)
+ throws HyracksDataException {
+ FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2);
+ }
+
+ private void addToRunFile(IFrameTupleAccessor accessor, int idx) throws HyracksDataException {
+ if (!runFileAppender.append(accessor, idx)) {
+ runFileAppender.flush(runFileWriter, true);
+ runFileAppender.append(accessor, idx);
+ }
+ }
+
+ private void openFromRunFile() throws HyracksDataException {
+ status.runFileStatus = RunFileStatus.READING;
+
+ // Create reader
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ rightRunFileTupleIndex = 0;
+
+ // Load first frame
+ runFileReader.nextFrame(runFileBuffer);
+ accessorRight.reset(runFileBuffer.getBuffer());
+ }
+
+ private void closeFromRunFile() throws HyracksDataException {
+ status.runFileStatus = RunFileStatus.NOT_USED;
+ runFileReader.close();
+ }
+
+ private void flushMemory() throws HyracksDataException {
+ bufferManager.reset();
+ memoryTuples.clear();
+ }
+
+ private void incrementLeftTuple() {
+ leftTupleIndex++;
+ }
+
+ private int getRightTupleIndex() throws HyracksDataException {
+ if (status.runFileStatus == RunFileStatus.READING) {
+ return rightRunFileTupleIndex;
+ } else {
+ return rightBufferTupleIndex;
+ }
+ }
+
+ private void incrementRightTuple() throws HyracksDataException {
+ if (status.runFileStatus == RunFileStatus.READING) {
+ ++rightRunFileTupleIndex;
+ } else {
+ rightBufferTupleIndex++;
+ }
+ }
+
+ /**
+ * Ensures a frame exists for the right branch, either from memory or the run file.
+ *
+ * @throws HyracksDataException
+ */
+ private boolean loadRightTuple() throws HyracksDataException {
+ boolean loaded = true;
+ if (status.runFileStatus == RunFileStatus.READING) {
+ if (rightRunFileTupleIndex >= accessorRight.getTupleCount()) {
+ if (runFileReader.nextFrame(runFileBuffer)) {
+ accessorRight.reset(runFileBuffer.getBuffer());
+ rightRunFileTupleIndex = 0;
+ } else {
+ closeFromRunFile();
+ return loadRightTuple();
+ }
+ }
+ } else {
+ if (rightBufferTupleIndex >= accessorRight.getTupleCount()) {
+ status.loadRightFrame = true;
+ locks.getRight(partition).signal();
+ try {
+ while (status.loadRightFrame && status.getRightStatus() == BranchStatus.DATA_PROCESSING) {
+ locks.getLeft(partition).await();
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(
+ "SortMergeIntervalJoin interrupted exception while attempting to load right tuple", e);
+ }
+ status.loadRightFrame = false;
+ loaded = (rightBufferTupleIndex == 0);
+ if (!loaded) {
+ status.rightHasMore = false;
+ }
+ }
+ }
+ return loaded;
+ }
+
+ /**
+ * Ensures a frame exists for the right branch, either from memory or the run file.
+ *
+ * @throws HyracksDataException
+ */
+ private boolean loadLeftTuple() throws HyracksDataException {
+ if (status.getLeftStatus() == BranchStatus.DATA_PROCESSING && leftTupleIndex >= accessorLeft.getTupleCount()) {
+ return false;
+ }
+ return true;
+ }
+
+ // memory management
+ private boolean memoryHasTuples() {
+ return bufferManager.getNumTuples() > 0;
+ }
+
+ public void processMerge() throws HyracksDataException {
+ // Ensure right tuple loaded into accessorRight
+ while (loadRightTuple() && status.rightHasMore) {
+ // *********************
+ // Left side from memory
+ // *********************
+ if (status.reloadingLeftFrame) {
+ // Skip the right frame memory processing.
+ status.reloadingLeftFrame = false;
+ } else {
+ if (status.runFileStatus == RunFileStatus.WRITING) {
+ // Write right tuple to run file
+ addToRunFile(accessorRight, getRightTupleIndex());
+ }
+
+ for (Iterator<TuplePointer> memoryIterator = memoryTuples.iterator(); memoryIterator.hasNext();) {
+ TuplePointer tp = memoryIterator.next();
+ memoryAccessor.reset(tp);
+ int c = comparator.compare(memoryAccessor, MEMORY_INDEX, accessorRight, getRightTupleIndex());
+ if (c < 0) {
+ // remove from memory
+ bufferManager.deleteTuple(tp);
+ memoryIterator.remove();
+ }
+ if (c == 0) {
+ // add to result
+ addToResult(memoryAccessor, MEMORY_INDEX, accessorRight, getRightTupleIndex());
+ }
+ }
+
+ if (!memoryHasTuples() && status.runFileStatus == RunFileStatus.WRITING) {
+ // Memory is empty and we can start processing the run file.
+ openFromRunFile();
+ flushMemory();
+ }
+ }
+
+ // *********************
+ // Left side from stream
+ // *********************
+ if (status.runFileStatus == RunFileStatus.NOT_USED && status.leftHasMore) {
+ int c = comparator.compare(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex());
+ while (c <= 0) {
+ if (c == 0) {
+ // add to result
+ addToResult(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex());
+ // append to memory
+ if (!addToMemory(accessorLeft, leftTupleIndex)) {
+ // go to log saving state
+ status.runFileStatus = RunFileStatus.WRITING;
+ // write right tuple to run file
+ addToRunFile(accessorRight, getRightTupleIndex());
+ // break (do not increment left tuple)
+ break;
+ }
+ }
+ incrementLeftTuple();
+ if (!loadLeftTuple()) {
+ return;
+ }
+ c = comparator.compare(accessorLeft, leftTupleIndex, accessorRight, getRightTupleIndex());
+ }
+ }
+ incrementRightTuple();
+ }
+ }
+
+ public void setLeftFrame(ByteBuffer buffer) {
+ leftBuffer = buffer;
+ accessorLeft.reset(leftBuffer);
+ leftTupleIndex = 0;
+ }
+
+ public void setRightFrame(ByteBuffer buffer) {
+ rightBuffer = buffer;
+ accessorRight.reset(rightBuffer);
+ rightBufferTupleIndex = 0;
+ }
+
+ public void setRightRecordDescriptor(RecordDescriptor rightRd) {
+ accessorRight = new FrameTupleAccessor(rightRd);
+ }
+}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java
new file mode 100644
index 0000000..c60ba94
--- /dev/null
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/interval/SortMergeIntervalStatus.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.interval;
+
+import java.io.Serializable;
+
+public class SortMergeIntervalStatus implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public enum BranchStatus {
+ UNKNOWN,
+ OPENED,
+ DATA_PROCESSING,
+ CLOSED,
+ }
+
+ public enum RunFileStatus {
+ NOT_USED,
+ WRITING,
+ READING,
+ }
+
+ public boolean reloadingLeftFrame = false;
+ public boolean loadRightFrame = false;
+
+ public boolean leftHasMore = true;
+ public boolean rightHasMore = true;
+
+ private BranchStatus leftStatus = BranchStatus.UNKNOWN;
+ private BranchStatus rightStatus = BranchStatus.UNKNOWN;
+
+ public RunFileStatus runFileStatus = RunFileStatus.NOT_USED;
+
+ public SortMergeIntervalStatus() {
+ }
+
+ public BranchStatus getLeftStatus() {
+ return leftStatus;
+ }
+
+ public BranchStatus getRightStatus() {
+ return rightStatus;
+ }
+
+ public void openLeft() {
+ leftStatus = BranchStatus.OPENED;
+ }
+
+ public void openRight() {
+ rightStatus = BranchStatus.OPENED;
+ }
+
+ public void dataLeft() {
+ leftStatus = BranchStatus.DATA_PROCESSING;
+ }
+
+ public void dataRight() {
+ rightStatus = BranchStatus.DATA_PROCESSING;
+ }
+
+ public void closeLeft() {
+ leftStatus = BranchStatus.CLOSED;
+ }
+
+ public void closeRight() {
+ rightStatus = BranchStatus.CLOSED;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 8d11476..6a86cf5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
! "License"); you may not use this file except in compliance
! with the License. You may obtain a copy of the License at
!
- ! http://www.apache.org/licenses/LICENSE-2.0
+ ! http://www.apache.org/licenses/LICENSE-2.0
!
! Unless required by applicable law or agreed to in writing,
! software distributed under the License is distributed on an
@@ -55,9 +55,9 @@
<invalid.tests>**/DmlTest.java</invalid.tests>
<global.test.includes>**/*TestSuite.java,**/*Test.java,${execution.tests}</global.test.includes>
<global.test.excludes>${optimizer.tests},${metadata.tests},${invalid.tests},${repeated.tests}</global.test.excludes>
- <!-- Versions under dependencymanagement or used in many projects via properties -->
- <algebricks.version>0.2.16-incubating</algebricks.version>
- <hyracks.version>0.2.16-incubating</hyracks.version>
+ <!-- Versions under dependency management or used in many projects via properties -->
+ <algebricks.version>0.2.16-SNAPSHOT</algebricks.version>
+ <hyracks.version>0.2.16-SNAPSHOT</hyracks.version>
<hadoop.version>2.2.0</hadoop.version>
<junit.version>4.11</junit.version>
<commons.io.version>2.4</commons.io.version>
@@ -309,6 +309,15 @@
<id>algebricks-snapshots</id>
<url>http://obelix.ics.uci.edu/nexus/content/repositories/algebricks-snapshots/</url>
</repository>
+ <repository>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <id>apache-staging</id>
+ <url>https://repository.apache.org/content/repositories/orgapacheasterix-1005/</url>
+ </repository>
</repositories>
<dependencyManagement>
<dependencies>
--
To view, visit https://asterix-gerrit.ics.uci.edu/424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <pr...@apache.org>
Change in asterixdb[master]: Range connector update and new merge interval join.
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Range connector update and new merge interval join.
......................................................................
Patch Set 1: Verified-1
Build Failed
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/332/ : FAILURE
--
To view, visit https://asterix-gerrit.ics.uci.edu/424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Yingyi Bu <bu...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Range connector update and new merge interval join.
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Range connector update and new merge interval join.
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/332/
--
To view, visit https://asterix-gerrit.ics.uci.edu/424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Yingyi Bu <bu...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Range connector update and new merge interval join.
Posted by "Preston Carman (Code Review)" <do...@asterixdb.incubator.apache.org>.
Preston Carman has posted comments on this change.
Change subject: Range connector update and new merge interval join.
......................................................................
Patch Set 1: Code-Review-2
--
To view, visit https://asterix-gerrit.ics.uci.edu/424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Yingyi Bu <bu...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Range connector update and new merge interval join.
Posted by "Preston Carman (Code Review)" <do...@asterixdb.incubator.apache.org>.
Preston Carman has abandoned this change.
Change subject: Range connector update and new merge interval join.
......................................................................
Abandoned
going to try a new clone
--
To view, visit https://asterix-gerrit.ics.uci.edu/424
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: abandon
Gerrit-Change-Id: Ie6bca14b81c41859f0f89cdc1fc95efd393e79f3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Preston Carman <pr...@apache.org>
Gerrit-Reviewer: Yingyi Bu <bu...@gmail.com>