You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:54:55 UTC

[05/50] [abbrv] asterixdb git commit: added more to the interval hint

added more to the interval hint


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fd84e345
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fd84e345
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fd84e345

Branch: refs/heads/ecarm002/interval_join_merge
Commit: fd84e345ec29f9b85f32132eef1f6e7f98941866
Parents: 0f533e8
Author: Preston Carman <pr...@apache.org>
Authored: Wed Jul 6 18:15:43 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Wed Jul 6 18:15:43 2016 -0700

----------------------------------------------------------------------
 .../IntervalPartitionJoinPOperator.java         |  20 +-
 .../rules/IntervalSplitPartitioningRule.java    |   1 -
 .../asterix/optimizer/rules/util/JoinUtils.java |  21 +-
 .../interval_overlapping.11.query.aql           |   2 +-
 .../interval_overlapping.12.query.aql           |   2 +-
 .../IntervalJoinExpressionAnnotation.java       |  65 +-
 ...econdaryIndexSearchExpressionAnnotation.java |  15 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |   4 +-
 .../IntervalPartitionJoin.java                  | 649 -------------------
 ...IntervalPartitionJoinOperatorDescriptor.java |  16 +-
 .../IntervalPartitionJoiner.java                | 649 +++++++++++++++++++
 .../IntervalPartitionUtil.java                  |  26 +-
 .../AbstractExpressionAnnotation.java           |  35 +
 .../ExpressionAnnotationNoCopyImpl.java         |  14 +-
 .../IndexedNLJoinExpressionAnnotation.java      |  14 +-
 15 files changed, 788 insertions(+), 745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index 414d0b4..1eff2aa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -33,18 +33,18 @@ import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
 public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperator {
 
     private final int memSizeInFrames;
-    private final int probeTupleCount;
-    private final int probeMaxDuration;
-    private final int buildTupleCount;
-    private final int buildMaxDuration;
+    private final long probeTupleCount;
+    private final long probeMaxDuration;
+    private final long buildTupleCount;
+    private final long buildMaxDuration;
     private final int avgTuplesInFrame;
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinPOperator.class.getName());
 
     public IntervalPartitionJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
-            int memSizeInFrames, int buildTupleCount, int probeTupleCount, int buildMaxDuration, int probeMaxDuration,
-            int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
+            int memSizeInFrames, long buildTupleCount, long probeTupleCount, long buildMaxDuration,
+            long probeMaxDuration, int avgTuplesInFrame, IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities, mjcf, rangeMap);
         this.memSizeInFrames = memSizeInFrames;
         this.buildTupleCount = buildTupleCount;
@@ -62,19 +62,19 @@ public class IntervalPartitionJoinPOperator extends AbstractIntervalJoinPOperato
                 + ".");
     }
 
-    public int getProbeTupleCount() {
+    public long getProbeTupleCount() {
         return probeTupleCount;
     }
 
-    public int getProbeMaxDuration() {
+    public long getProbeMaxDuration() {
         return probeMaxDuration;
     }
 
-    public int getBuildTupleCount() {
+    public long getBuildTupleCount() {
         return buildTupleCount;
     }
 
-    public int getBuildMaxDuration() {
+    public long getBuildMaxDuration() {
         return buildMaxDuration;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index 2772e68..629606c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -370,7 +370,6 @@ public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
             flags[i] = true;
         }
         ReplicateOperator splitOperator = new ReplicateOperator(flags.length, flags);
-        //        ReplicatePOperator splitPOperator = new ReplicatePOperator();
         IntervalLocalRangeSplitterPOperator splitPOperator = new IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars,
                 rangeMap);
         splitOperator.setPhysicalOperator(splitPOperator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index b2f010c..14b7aa6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -27,6 +27,8 @@ import java.util.logging.Logger;
 import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
 import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
 import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
+import org.apache.asterix.common.annotations.JoinIntervalMaxDurationExpressionAnnotation;
+import org.apache.asterix.common.annotations.JoinRecordCountsExpressionAnnotation;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory;
 import org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory;
@@ -89,7 +91,7 @@ public class JoinUtils {
             } else if (ijea.isPartitionJoin()) {
                 // Overlapping Interval Partition.
                 LOGGER.fine("Interval Join - Cluster Parititioning");
-                setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), context);
+                setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea.getRangeMap(), ijea, context);
             } else if (ijea.isSpatialJoin()) {
                 // Spatial Partition.
                 LOGGER.fine("Interval Join - Spatial Partitioning");
@@ -112,6 +114,7 @@ public class JoinUtils {
         return null;
     }
 
+
     private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap,
             IOptimizationContext context) {
@@ -122,12 +125,20 @@ public class JoinUtils {
 
     private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IRangeMap rangeMap,
-            IOptimizationContext context) {
+            IntervalJoinExpressionAnnotation ijea, IOptimizationContext context) {
+        long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount()
+                : getCardinality(sideLeft, context);
+        long rightCount = ijea.getRightRecordCount() > 0 ? ijea.getRightRecordCount()
+                : getCardinality(sideRight, context);
+        long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? ijea.getLeftMaxDuration()
+                : getMaxDuration(sideLeft, context);
+        long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? ijea.getRightMaxDuration()
+                : getMaxDuration(sideRight, context);
+
         IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, rangeMap);
         op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
-                sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
-                getCardinality(sideLeft, context), getCardinality(sideRight, context),
-                getMaxDuration(sideLeft, context), getMaxDuration(sideRight, context),
+                sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
+                rightCount, leftMaxDuration, rightMaxDuration,
                 context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), mjcf, rangeMap));
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
index 6222c86..1fe23da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.11.query.aql
@@ -26,7 +26,7 @@ use dataverse TinyCollege;
 
 for $f in dataset Staff
 for $d in dataset Students
-where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($d.attendance, $f.employment)
+where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($d.attendance, $f.employment)
 /*+ range ["F", "L", "R"] */
 order by $f.name, $d.name
 return { "staff" : $f.name, "student" : $d.name }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
index 337221d..2057130 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/temporal/interval_joins/interval_overlapping/interval_overlapping.12.query.aql
@@ -26,7 +26,7 @@ use dataverse TinyCollege;
 
 for $f in dataset Staff
 for $d in dataset Students
-where /*+ interval-partition-join [10000, 11000, 12000, 14000, 15000] */ interval-overlapping($f.employment, $d.attendance)
+where /*+ interval-partition-join [10000,11000,12000,14000,15000] 7 7 400 400 */ interval-overlapping($f.employment, $d.attendance)
 /*+ range ["F", "L", "R"] */
 order by $f.name, $d.name
 return { "staff" : $f.name, "student" : $d.name }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
index 342b9e8..f2f325d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/IntervalJoinExpressionAnnotation.java
@@ -18,10 +18,11 @@
  */
 package org.apache.asterix.common.annotations;
 
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
 
-public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
+public class IntervalJoinExpressionAnnotation extends AbstractExpressionAnnotation {
 
     private static final String RAW_HINT_STRING = "interval-raw-join";
     private static final String PARTITION_HINT_STRING = "interval-partition-join";
@@ -30,19 +31,13 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
     private static final String INDEX_HINT_STRING = "interval-index-join";
     public static final IntervalJoinExpressionAnnotation INSTANCE = new IntervalJoinExpressionAnnotation();
 
-    private Object object;
-    private IRangeMap map;
-    private String joinType;
+    private IRangeMap map = null;
+    private String joinType = null;
+    private long leftMaxDuration = -1;
+    private long rightMaxDuration = -1;
+    private long leftRecordCount = -1;
+    private long rightRecordCount = -1;
 
-    @Override
-    public Object getObject() {
-        return object;
-    }
-
-    @Override
-    public void setObject(Object object) {
-        this.object = object;
-    }
 
     @Override
     public IExpressionAnnotation copy() {
@@ -51,15 +46,25 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
         return clone;
     }
 
-    public void setRangeMap(IRangeMap map) {
-        this.map = map;
+    @Override
+    public void setObject(Object object) {
+        super.setObject(object);
+        parseHint();
     }
 
-    public IRangeMap getRangeMap() {
-        return map;
+    private void parseHint() {
+        String[] args = ((String) object).split(" ");
+        setJoinType(args[0]);
+
+        if (joinType.equals(PARTITION_HINT_STRING) && args.length == 6) {
+            leftRecordCount = Long.valueOf(args[2]);
+            rightRecordCount = Long.valueOf(args[3]);
+            leftMaxDuration = Long.valueOf(args[4]);
+            rightMaxDuration = Long.valueOf(args[5]);
+        }
     }
 
-    public void setJoinType(String hint) {
+    private void setJoinType(String hint) {
         if (hint.startsWith(RAW_HINT_STRING)) {
             joinType = RAW_HINT_STRING;
         } else if (hint.startsWith(PARTITION_HINT_STRING)) {
@@ -73,6 +78,30 @@ public class IntervalJoinExpressionAnnotation implements IExpressionAnnotation {
         }
     }
 
+    public long getLeftMaxDuration() {
+        return leftMaxDuration;
+    }
+
+    public long getRightMaxDuration() {
+        return rightMaxDuration;
+    }
+
+    public long getLeftRecordCount() {
+        return leftRecordCount;
+    }
+
+    public long getRightRecordCount() {
+        return rightRecordCount;
+    }
+
+    public void setRangeMap(IRangeMap map) {
+        this.map = map;
+    }
+
+    public IRangeMap getRangeMap() {
+        return map;
+    }
+
     public String getRangeType() {
         return joinType;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
index e1dd1cb..de1e1fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/SkipSecondaryIndexSearchExpressionAnnotation.java
@@ -18,25 +18,14 @@
  */
 package org.apache.asterix.common.annotations;
 
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 
-public class SkipSecondaryIndexSearchExpressionAnnotation implements IExpressionAnnotation {
+public class SkipSecondaryIndexSearchExpressionAnnotation extends AbstractExpressionAnnotation {
 
     public static final String HINT_STRING = "skip-index";
     public static final SkipSecondaryIndexSearchExpressionAnnotation INSTANCE = new SkipSecondaryIndexSearchExpressionAnnotation();
 
-    private Object object;
-
-    @Override
-    public Object getObject() {
-        return object;
-    }
-
-    @Override
-    public void setObject(Object object) {
-        this.object = object;
-    }
-
     @Override
     public IExpressionAnnotation copy() {
         SkipSecondaryIndexSearchExpressionAnnotation clone = new SkipSecondaryIndexSearchExpressionAnnotation();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 80ddeef..bbba41d 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -2112,10 +2112,10 @@ Expression FunctionCallExpr() throws ParseException:
         if (IntervalJoinExpressionAnnotation.isIntervalJoinHint(hint)) {
           IntervalJoinExpressionAnnotation ijea = IntervalJoinExpressionAnnotation.INSTANCE;
           ijea.setObject(hint);
-          ijea.setJoinType(hint);
           try {
             if (ijea.hasRangeArgument()) {
-              ijea.setRangeMap(RangeMapBuilder.parseHint(hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint))));
+              String rangeHint = hint.substring(IntervalJoinExpressionAnnotation.getHintLength(hint), hint.indexOf(']', 0) + 1);
+              ijea.setRangeMap(RangeMapBuilder.parseHint(rangeHint));
             }
           } catch (AsterixException e) {
             {if (true) throw new ParseException(e.getMessage());}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
deleted file mode 100644
index 1bccbd2..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoin.java
+++ /dev/null
@@ -1,649 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.intervalpartition;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.io.RunFileReader;
-import org.apache.hyracks.dataflow.common.io.RunFileWriter;
-import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
-import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-/**
- * This class mainly applies one level of HHJ on a pair of
- * relations. It is always called by the descriptor.
- */
-public class IntervalPartitionJoin {
-
-    // Used for special probe BigObject which can not be held into the Join memory
-    private FrameTupleAppender bigProbeFrameAppender;
-
-    enum SIDE {
-        BUILD,
-        PROBE
-    }
-
-    private IHyracksTaskContext ctx;
-
-    private final String buildRelName;
-    private final String probeRelName;
-
-    private final ITuplePartitionComputer buildHpc;
-    private final ITuplePartitionComputer probeHpc;
-
-    private final RecordDescriptor buildRd;
-    private final RecordDescriptor probeRd;
-
-    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
-    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
-    private final int memForJoin;
-    private final int k;
-    private final int numOfPartitions;
-    private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
-
-    private VPartitionTupleBufferManager buildBufferManager;
-    private VPartitionTupleBufferManager probeBufferManager;
-
-    private final FrameTupleAccessor accessorBuild;
-    private final FrameTupleAccessor accessorProbe;
-
-    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoin.class.getName());
-
-    // stats information
-    private IntervalPartitionJoinData ipjd;
-
-    private IFrame reloadBuffer;
-    private TuplePointer tempPtr = new TuplePointer();
-
-    private IIntervalMergeJoinChecker imjc;
-
-    public IntervalPartitionJoin(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
-            String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
-        this.ctx = ctx;
-        this.memForJoin = memForJoin;
-        this.k = k;
-        this.buildRd = buildRd;
-        this.probeRd = probeRd;
-        this.buildHpc = buildHpc;
-        this.probeHpc = probeHpc;
-        this.imjc = imjc;
-        this.buildRelName = buildRelName;
-        this.probeRelName = probeRelName;
-
-        this.numOfPartitions = numOfPartitions;
-        this.buildRFWriters = new RunFileWriter[numOfPartitions];
-        this.probeRFWriters = new RunFileWriter[numOfPartitions];
-        this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
-
-        this.accessorBuild = new FrameTupleAccessor(buildRd);
-        this.accessorProbe = new FrameTupleAccessor(probeRd);
-
-        ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
-    }
-
-    public void initBuild() throws HyracksDataException {
-        buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
-                memForJoin * ctx.getInitialFrameSize());
-    }
-
-    private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
-        return VPartitionTupleBufferManager.NO_CONSTRAIN;
-    }
-
-    public void build(ByteBuffer buffer) throws HyracksDataException {
-        accessorBuild.reset(buffer);
-        int tupleCount = accessorBuild.getTupleCount();
-
-        for (int i = 0; i < tupleCount; ++i) {
-            int pid = buildHpc.partition(accessorBuild, i, k);
-            processTuple(i, pid);
-            ipjd.buildIncrementCount(pid);
-        }
-    }
-
-    public void closeBuild() throws HyracksDataException {
-        int inMemoryPartitions = 0;
-        int totalBuildPartitions = 0;
-        flushAndClearBuildSpilledPartition();
-
-        // Trying to bring back as many spilled partitions as possible, making them resident
-        bringBackSpilledPartitionIfHasMoreMemory();
-
-        // Update build partition join map based on partitions with actual data.
-        for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
-            if (ipjd.buildGetCount(i) == 0) {
-                ipjd.buildRemoveFromJoin(i);
-            } else if (ipjd.buildGetCount(i) > 0) {
-                // Set up build memory for processing joins for partitions in memory.
-                createInMemoryJoiner(i);
-                inMemoryPartitions++;
-                totalBuildPartitions += ipjd.buildGetCount(i);
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
-                    + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
-                    + ipjd.buildGetSpilledCount());
-        }
-    }
-
-    private void processTuple(int tid, int pid) throws HyracksDataException {
-        while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-            int victimPartition = selectPartitionToSpill();
-            if (victimPartition < 0) {
-                throw new HyracksDataException(
-                        "No more space left in the memory buffer, please give join more memory budgets.");
-            }
-            spillPartition(victimPartition);
-        }
-    }
-
-    private int selectPartitionToSpill() {
-        int partitionToSpill = selectLargestSpilledPartition();
-        int maxToSpillPartSize = 0;
-        if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
-                .getInitialFrameSize()) {
-            int partitionInMem = selectNextInMemoryPartitionToSpill();
-            if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
-                partitionToSpill = partitionInMem;
-            }
-        }
-        return partitionToSpill;
-    }
-
-    /**
-     * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
-     *
-     * @return
-     */
-    private int selectNextInMemoryPartitionToSpill() {
-        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
-            if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-    private int selectLargestSpilledPartition() {
-        int pid = -1;
-        int max = 0;
-        for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
-            int partSize = buildBufferManager.getPhysicalSize(i);
-            if (partSize > max) {
-                max = partSize;
-                pid = i;
-            }
-        }
-        return pid;
-    }
-
-    private void spillPartition(int pid) throws HyracksDataException {
-        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
-        buildBufferManager.flushPartition(pid, writer);
-        buildBufferManager.clearPartition(pid);
-        ipjd.buildSpill(pid);
-    }
-
-    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
-        RunFileWriter[] runFileWriters = null;
-        String refName = null;
-        switch (whichSide) {
-            case BUILD:
-                runFileWriters = buildRFWriters;
-                refName = buildRelName;
-                break;
-            case PROBE:
-                refName = probeRelName;
-                runFileWriters = probeRFWriters;
-                break;
-            default:
-        }
-        RunFileWriter writer = runFileWriters[pid];
-        if (writer == null) {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
-            writer = new RunFileWriter(file, ctx.getIOManager());
-            writer.open();
-            runFileWriters[pid] = writer;
-        }
-        return writer;
-    }
-
-    public void clearBuildMemory() throws HyracksDataException {
-        for (int pid = 0; pid < numOfPartitions; ++pid) {
-            if (buildBufferManager.getNumTuples(pid) > 0) {
-                buildBufferManager.clearPartition(pid);
-                ipjd.buildRemoveFromJoin(pid);
-            }
-        }
-    }
-
-    private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
-        for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
-            if (buildBufferManager.getNumTuples(pid) > 0) {
-                buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
-                buildBufferManager.clearPartition(pid);
-                buildRFWriters[pid].close();
-            }
-        }
-    }
-
-    private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
-        for (int pid = 0; pid < numOfPartitions; ++pid) {
-            if (probeBufferManager.getNumTuples(pid) > 0) {
-                probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
-                probeBufferManager.clearPartition(pid);
-                probeRFWriters[pid].close();
-            }
-        }
-    }
-
-    private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
-        // we need number of |spilledPartitions| buffers to store the probe data
-        int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize();
-        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
-            freeSpace -= buildBufferManager.getPhysicalSize(i);
-        }
-
-        int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
-            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
-                return;
-            }
-            freeSpace -= buildBufferManager.getPhysicalSize(pid);
-        }
-    }
-
-    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
-        RunFileReader r = wr.createDeleteOnCloseReader();
-        r.open();
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
-        }
-        while (r.nextFrame(reloadBuffer)) {
-            accessorBuild.reset(reloadBuffer.getBuffer());
-            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
-                if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                    // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
-                    buildBufferManager.clearPartition(pid);
-                    r.close();
-                    return false;
-                }
-            }
-        }
-
-        r.close();
-        ipjd.buildLoad(pid);
-        buildRFWriters[pid] = null;
-        return true;
-    }
-
-    private int selectPartitionsToReload(int freeSpace, int pid) {
-        for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
-            assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
-            if (freeSpace >= buildRFWriters[id].getFileSize()) {
-                return id;
-            }
-        }
-        return -1;
-    }
-
-    private void createInMemoryJoiner(int pid) throws HyracksDataException {
-        this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
-                buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
-    }
-
-    private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
-        this.inMemJoiner[pid].closeJoin(writer);
-        this.inMemJoiner[pid] = null;
-    }
-
-    public void initProbe() throws HyracksDataException {
-        int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions;
-        probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
-                (probeMemory) * ctx.getInitialFrameSize());
-
-        probeRFWriters = new RunFileWriter[numOfPartitions];
-    }
-
-    public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessorProbe.reset(buffer);
-        int tupleCount = accessorProbe.getTupleCount();
-
-        for (int i = 0; i < tupleCount; ++i) {
-            int pid = probeHpc.partition(accessorProbe, i, k);
-            if (!ipjd.hasProbeJoinMap(pid)) {
-                // Set probe join map
-                ipjd.setProbeJoinMap(pid,
-                        IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
-            }
-
-            // Tuple has potential match from build phase
-            if (!ipjd.isProbeJoinMapEmpty(pid)) {
-                if (ipjd.probeHasSpilled(pid)) {
-                    // pid is Spilled
-                    while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
-                        int victim = pid;
-                        if (probeBufferManager.getNumTuples(pid) == 0) {
-                            // current pid is empty, choose the biggest one
-                            victim = selectLargestSpilledPartition();
-                        }
-                        if (victim < 0) {
-                            // current tuple is too big for all the free space
-                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
-                            break;
-                        }
-                        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
-                        probeBufferManager.flushPartition(victim, runFileWriter);
-                        probeBufferManager.clearPartition(victim);
-                    }
-                }
-                for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
-                    // pid has join partitions that are Resident
-                    int j = pidIterator.next();
-                    if (inMemJoiner[j] != null) {
-                        inMemJoiner[j].join(accessorProbe, i, writer);
-                    }
-                }
-            }
-            ipjd.probeIncrementCount(pid);
-        }
-    }
-
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
-        // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
-        for (int i = 0; i < inMemJoiner.length; ++i) {
-            if (inMemJoiner[i] != null) {
-                closeInMemoryJoiner(i, writer);
-                ipjd.buildLogJoined(i);
-            }
-        }
-        clearBuildMemory();
-        flushAndClearProbeSpilledPartition();
-        probeBufferManager.close();
-        probeBufferManager = null;
-    }
-
-    private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
-            throws HyracksDataException {
-        if (bigProbeFrameAppender == null) {
-            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
-        }
-        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
-        if (!bigProbeFrameAppender.append(accessorProbe, i)) {
-            throw new HyracksDataException("The given tuple is too big");
-        }
-        bigProbeFrameAppender.write(runFileWriter, true);
-    }
-
-    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
-        return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
-    }
-
-    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
-        return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
-    }
-
-    public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
-        LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
-        }
-        HashSet<Integer> inMemory = new HashSet<>();
-        while (ipjd.buildGetSpilledCount() > 0) {
-            // Load back spilled build partitions.
-            // TODO only load partition required for spill join. Consider both sides.
-            bringBackSpilledPartitionIfHasMoreMemory();
-
-            probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
-
-            // Create in memory joiners.
-            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
-                    .buildNextInMemoryWithResults(pid + 1)) {
-                createInMemoryJoiner(pid);
-                inMemory.add(pid);
-            }
-
-            // Join all build partitions with disk probe partitions.
-            for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
-                if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
-                    RunFileReader pReader = getProbeRFReader(entry.getKey());
-                    pReader.open();
-                    while (pReader.nextFrame(reloadBuffer)) {
-                        accessorProbe.reset(reloadBuffer.getBuffer());
-                        for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
-                            // Tuple has potential match from build phase
-                            for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) {
-                                // j has join partitions that are Resident
-                                if (inMemJoiner[j] != null) {
-                                    inMemJoiner[j].join(accessorProbe, i, writer);
-                                }
-                            }
-                        }
-                    }
-                    pReader.close();
-                }
-            }
-
-            // Clean up build memory.
-            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
-                    .buildNextInMemoryWithResults(pid + 1)) {
-                closeInMemoryJoiner(pid, writer);
-                ipjd.buildLogJoined(pid);
-            }
-            inMemory.clear();
-            clearBuildMemory();
-        }
-    }
-
-    class IntervalPartitionJoinData {
-        private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
-
-        private int[] buildPSizeInTups;
-        private int[] probePSizeInTups;
-
-        private BitSet buildJoinedCompleted; //0=waiting, 1=joined
-        private BitSet buildSpilledStatus; //0=resident, 1=spilled
-        private BitSet buildInMemoryStatus; //0=unknown, 1=resident
-        private BitSet probeSpilledStatus; //0=resident, 1=spilled
-
-        public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
-            probeJoinMap = new LinkedHashMap<>();
-
-            buildPSizeInTups = new int[numberOfPartitions];
-            probePSizeInTups = new int[numberOfPartitions];
-
-            buildJoinedCompleted = new BitSet(numberOfPartitions);
-            buildInMemoryStatus = new BitSet(numberOfPartitions);
-            buildSpilledStatus = new BitSet(numberOfPartitions);
-            probeSpilledStatus = new BitSet(numberOfPartitions);
-        }
-
-        public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
-            return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
-        }
-
-        public boolean hasProbeJoinMap(int pid) {
-            return probeJoinMap.containsKey(pid);
-        }
-
-        public boolean isProbeJoinMapEmpty(int pid) {
-            return probeJoinMap.get(pid).isEmpty();
-        }
-
-        public Iterator<Integer> getProbeJoinMap(int pid) {
-            return probeJoinMap.get(pid).iterator();
-        }
-
-        public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
-            probeJoinMap.put(new Integer(pid), map);
-            for (Integer i : map) {
-                if (buildIsSpilled(i)) {
-                    // Build join partition has spilled. Now spill the probe also.
-                    probeSpilledStatus.set(pid);
-                }
-            }
-        }
-
-        public void buildIncrementCount(int pid) {
-            buildInMemoryStatus.set(pid);
-            buildPSizeInTups[pid]++;
-        }
-
-        public int buildGetCount(int pid) {
-            return buildPSizeInTups[pid];
-        }
-
-        public void buildLogJoined(int pid) {
-            buildSpilledStatus.clear(pid);
-            buildJoinedCompleted.set(pid);
-        }
-
-        public void buildRemoveFromJoin(int pid) {
-            buildSpilledStatus.clear(pid);
-            buildJoinedCompleted.set(pid);
-        }
-
-        public boolean buildHasBeenJoined(int pid) {
-            return buildJoinedCompleted.get(pid);
-        }
-
-        public int buildGetSpilledCount() {
-            return buildSpilledStatus.cardinality();
-        }
-
-        public void buildSpill(int pid) {
-            buildInMemoryStatus.clear(pid);
-            buildSpilledStatus.set(pid);
-        }
-
-        public void buildLoad(int pid) {
-            buildInMemoryStatus.set(pid);
-            buildSpilledStatus.clear(pid);
-        }
-
-        public boolean buildIsSpilled(int pid) {
-            return buildSpilledStatus.get(pid);
-        }
-
-        public int buildNextSpilled(int pid) {
-            return buildSpilledStatus.nextSetBit(pid);
-        }
-
-        public int buildNextInMemoryWithResults(int pid) {
-            int nextPid = buildNextInMemory(pid);
-            do {
-                if (nextPid < 0 || buildGetCount(nextPid) > 0) {
-                    return nextPid;
-                }
-                nextPid = buildNextInMemory(nextPid + 1);
-            } while (nextPid >= 0);
-            return -1;
-        }
-
-        public int buildNextInMemory(int pid) {
-            int nextPid =  buildSpilledStatus.nextClearBit(pid);
-            if (nextPid >= numOfPartitions) {
-                return -1;
-            }
-            do {
-                if (!buildHasBeenJoined(nextPid)) {
-                    return nextPid;
-                }
-                nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
-            } while (nextPid >= 0 && nextPid < numOfPartitions);
-            return -1;
-        }
-
-        public void probeIncrementCount(int pid) {
-            probePSizeInTups[pid]++;
-        }
-
-        public int probeGetCount(int pid) {
-            return probePSizeInTups[pid];
-        }
-
-        public void probeSpill(int pid) {
-            probeSpilledStatus.set(pid);
-        }
-
-        public boolean probeHasSpilled(int pid) {
-            return probeSpilledStatus.get(pid);
-        }
-
-        public int buildGetMaxPartitionSize() {
-            int max = buildPSizeInTups[0];
-            for (int i = 1; i < buildPSizeInTups.length; i++) {
-                if (buildPSizeInTups[i] > max) {
-                    max = buildPSizeInTups[i];
-                }
-            }
-            return max;
-        }
-
-        public int probeGetMaxPartitionSize() {
-            int max = probePSizeInTups[0];
-            for (int i = 1; i < probePSizeInTups.length; i++) {
-                if (probePSizeInTups[i] > max) {
-                    max = probePSizeInTups[i];
-                }
-            }
-            return max;
-        }
-
-    }
-
-    public void closeAndDeleteRunFiles() throws HyracksDataException {
-        for (RunFileWriter rfw : buildRFWriters) {
-            if (rfw != null) {
-                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
-            }
-        }
-        for (RunFileWriter rfw : probeRFWriters) {
-            if (rfw != null) {
-                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 21e07a5..0dd358c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -57,10 +57,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
     private final int[] probeKeys;
     private final int[] buildKeys;
 
-    private final int probeTupleCount;
-    private final int probeMaxDuration;
-    private final int buildTupleCount;
-    private final int buildMaxDuration;
+    private final long probeTupleCount;
+    private final long probeMaxDuration;
+    private final long buildTupleCount;
+    private final long buildMaxDuration;
     private final int avgTuplesPerFrame;
     private final int probeKey;
     private final int buildKey;
@@ -69,8 +69,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
 
-    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int leftTupleCount,
-            int rightTupleCount, int leftMaxDuration, int rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
+    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount,
+            long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
             int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
             IRangeMap rangeMap) {
         super(spec, 2, 1);
@@ -108,7 +108,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
     }
 
     public static class BuildAndPartitionTaskState extends AbstractStateObject {
-        private IntervalPartitionJoin ipj;
+        private IntervalPartitionJoiner ipj;
         private int intervalPartitions;
         private int partition;
         private int k;
@@ -169,7 +169,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
                     state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
                     state.memoryForJoin = memsize;
                     IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition);
-                    state.ipj = new IntervalPartitionJoin(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
+                    state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
 
                     state.ipj.initBuild();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
new file mode 100644
index 0000000..5df7b0a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.intervalpartition;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.RunFileReader;
+import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedMemoryConstrain;
+import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * This class mainly applies one level of HHJ on a pair of
+ * relations. It is always called by the descriptor.
+ */
+public class IntervalPartitionJoiner {
+
+    // Used for special probe BigObject which can not be held into the Join memory
+    private FrameTupleAppender bigProbeFrameAppender;
+
+    enum SIDE {
+        BUILD,
+        PROBE
+    }
+
+    private IHyracksTaskContext ctx;
+
+    private final String buildRelName;
+    private final String probeRelName;
+
+    private final ITuplePartitionComputer buildHpc;
+    private final ITuplePartitionComputer probeHpc;
+
+    private final RecordDescriptor buildRd;
+    private final RecordDescriptor probeRd;
+
+    private RunFileWriter[] buildRFWriters; //writing spilled build partitions
+    private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
+
+    private final int memForJoin;
+    private final int k;
+    private final int numOfPartitions;
+    private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining resident partitions
+
+    private VPartitionTupleBufferManager buildBufferManager;
+    private VPartitionTupleBufferManager probeBufferManager;
+
+    private final FrameTupleAccessor accessorBuild;
+    private final FrameTupleAccessor accessorProbe;
+
+    private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoiner.class.getName());
+
+    // stats information
+    private IntervalPartitionJoinData ipjd;
+
+    private IFrame reloadBuffer;
+    private TuplePointer tempPtr = new TuplePointer();
+
+    private IIntervalMergeJoinChecker imjc;
+
+    public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, int k, int numOfPartitions,
+            String buildRelName, String probeRelName, IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
+            RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, ITuplePartitionComputer probeHpc) {
+        this.ctx = ctx;
+        this.memForJoin = memForJoin;
+        this.k = k;
+        this.buildRd = buildRd;
+        this.probeRd = probeRd;
+        this.buildHpc = buildHpc;
+        this.probeHpc = probeHpc;
+        this.imjc = imjc;
+        this.buildRelName = buildRelName;
+        this.probeRelName = probeRelName;
+
+        this.numOfPartitions = numOfPartitions;
+        this.buildRFWriters = new RunFileWriter[numOfPartitions];
+        this.probeRFWriters = new RunFileWriter[numOfPartitions];
+        this.inMemJoiner = new InMemoryIntervalPartitionJoin[numOfPartitions];
+
+        this.accessorBuild = new FrameTupleAccessor(buildRd);
+        this.accessorProbe = new FrameTupleAccessor(probeRd);
+
+        ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
+    }
+
+    public void initBuild() throws HyracksDataException {
+        buildBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
+                memForJoin * ctx.getInitialFrameSize());
+    }
+
+    private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
+        return VPartitionTupleBufferManager.NO_CONSTRAIN;
+    }
+
+    public void build(ByteBuffer buffer) throws HyracksDataException {
+        accessorBuild.reset(buffer);
+        int tupleCount = accessorBuild.getTupleCount();
+
+        for (int i = 0; i < tupleCount; ++i) {
+            int pid = buildHpc.partition(accessorBuild, i, k);
+            processTuple(i, pid);
+            ipjd.buildIncrementCount(pid);
+        }
+    }
+
+    public void closeBuild() throws HyracksDataException {
+        int inMemoryPartitions = 0;
+        int totalBuildPartitions = 0;
+        flushAndClearBuildSpilledPartition();
+
+        // Trying to bring back as many spilled partitions as possible, making them resident
+        bringBackSpilledPartitionIfHasMoreMemory();
+
+        // Update build partition join map based on partitions with actual data.
+        for (int i = ipjd.buildNextInMemory(0); i >= 0; i = ipjd.buildNextInMemory(i + 1)) {
+            if (ipjd.buildGetCount(i) == 0) {
+                ipjd.buildRemoveFromJoin(i);
+            } else if (ipjd.buildGetCount(i) > 0) {
+                // Set up build memory for processing joins for partitions in memory.
+                createInMemoryJoiner(i);
+                inMemoryPartitions++;
+                totalBuildPartitions += ipjd.buildGetCount(i);
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("IntervalPartitionJoin has closed the build phase. Total tuples: " + totalBuildPartitions
+                    + ", In memory partitions: " + inMemoryPartitions + ", Spilled partitions: "
+                    + ipjd.buildGetSpilledCount());
+        }
+    }
+
+    private void processTuple(int tid, int pid) throws HyracksDataException {
+        while (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+            int victimPartition = selectPartitionToSpill();
+            if (victimPartition < 0) {
+                throw new HyracksDataException(
+                        "No more space left in the memory buffer, please give join more memory budgets.");
+            }
+            spillPartition(victimPartition);
+        }
+    }
+
+    private int selectPartitionToSpill() {
+        int partitionToSpill = selectLargestSpilledPartition();
+        int maxToSpillPartSize = 0;
+        if (partitionToSpill < 0 || (maxToSpillPartSize = buildBufferManager.getPhysicalSize(partitionToSpill)) == ctx
+                .getInitialFrameSize()) {
+            int partitionInMem = selectNextInMemoryPartitionToSpill();
+            if (partitionInMem >= 0 && buildBufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
+                partitionToSpill = partitionInMem;
+            }
+        }
+        return partitionToSpill;
+    }
+
+    /**
+     * Select next partition to spill. The partitions have been numbered in the order they should be spilled.
+     *
+     * @return
+     */
+    private int selectNextInMemoryPartitionToSpill() {
+        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
+            if (!ipjd.buildIsSpilled(i) && buildBufferManager.getPhysicalSize(i) > 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private int selectLargestSpilledPartition() {
+        int pid = -1;
+        int max = 0;
+        for (int i = ipjd.buildNextSpilled(0); i >= 0; i = ipjd.buildNextSpilled(i + 1)) {
+            int partSize = buildBufferManager.getPhysicalSize(i);
+            if (partSize > max) {
+                max = partSize;
+                pid = i;
+            }
+        }
+        return pid;
+    }
+
+    private void spillPartition(int pid) throws HyracksDataException {
+        RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD);
+        buildBufferManager.flushPartition(pid, writer);
+        buildBufferManager.clearPartition(pid);
+        ipjd.buildSpill(pid);
+    }
+
+    private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
+        RunFileWriter[] runFileWriters = null;
+        String refName = null;
+        switch (whichSide) {
+            case BUILD:
+                runFileWriters = buildRFWriters;
+                refName = buildRelName;
+                break;
+            case PROBE:
+                refName = probeRelName;
+                runFileWriters = probeRFWriters;
+                break;
+            default:
+        }
+        RunFileWriter writer = runFileWriters[pid];
+        if (writer == null) {
+            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(refName);
+            writer = new RunFileWriter(file, ctx.getIOManager());
+            writer.open();
+            runFileWriters[pid] = writer;
+        }
+        return writer;
+    }
+
+    public void clearBuildMemory() throws HyracksDataException {
+        for (int pid = 0; pid < numOfPartitions; ++pid) {
+            if (buildBufferManager.getNumTuples(pid) > 0) {
+                buildBufferManager.clearPartition(pid);
+                ipjd.buildRemoveFromJoin(pid);
+            }
+        }
+    }
+
+    private void flushAndClearBuildSpilledPartition() throws HyracksDataException {
+        for (int pid = ipjd.buildNextSpilled(0); pid >= 0; pid = ipjd.buildNextSpilled(pid + 1)) {
+            if (buildBufferManager.getNumTuples(pid) > 0) {
+                buildBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.BUILD));
+                buildBufferManager.clearPartition(pid);
+                buildRFWriters[pid].close();
+            }
+        }
+    }
+
+    private void flushAndClearProbeSpilledPartition() throws HyracksDataException {
+        for (int pid = 0; pid < numOfPartitions; ++pid) {
+            if (probeBufferManager.getNumTuples(pid) > 0) {
+                probeBufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE));
+                probeBufferManager.clearPartition(pid);
+                probeRFWriters[pid].close();
+            }
+        }
+    }
+
+    private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
+        // we need number of |spilledPartitions| buffers to store the probe data
+        int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * ctx.getInitialFrameSize();
+        for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = ipjd.buildNextInMemoryWithResults(i + 1)) {
+            freeSpace -= buildBufferManager.getPhysicalSize(i);
+        }
+
+        int pid = 0;
+        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
+            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
+                return;
+            }
+            freeSpace -= buildBufferManager.getPhysicalSize(pid);
+        }
+    }
+
+    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
+        RunFileReader r = wr.createDeleteOnCloseReader();
+        r.open();
+        if (reloadBuffer == null) {
+            reloadBuffer = new VSizeFrame(ctx);
+        }
+        while (r.nextFrame(reloadBuffer)) {
+            accessorBuild.reset(reloadBuffer.getBuffer());
+            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+                if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
+                    // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
+                    buildBufferManager.clearPartition(pid);
+                    r.close();
+                    return false;
+                }
+            }
+        }
+
+        r.close();
+        ipjd.buildLoad(pid);
+        buildRFWriters[pid] = null;
+        return true;
+    }
+
+    private int selectPartitionsToReload(int freeSpace, int pid) {
+        for (int id = ipjd.buildNextSpilled(0); id >= 0; id = ipjd.buildNextSpilled(id + 1)) {
+            assert buildRFWriters[id].getFileSize() > 0 : "How comes a spilled partition have size 0?";
+            if (freeSpace >= buildRFWriters[id].getFileSize()) {
+                return id;
+            }
+        }
+        return -1;
+    }
+
+    private void createInMemoryJoiner(int pid) throws HyracksDataException {
+        this.inMemJoiner[pid] = new InMemoryIntervalPartitionJoin(ctx,
+                buildBufferManager.getPartitionFrameBufferManager(pid), imjc, buildRd, probeRd);
+    }
+
+    private void closeInMemoryJoiner(int pid, IFrameWriter writer) throws HyracksDataException {
+        this.inMemJoiner[pid].closeJoin(writer);
+        this.inMemJoiner[pid] = null;
+    }
+
+    public void initProbe() throws HyracksDataException {
+        int probeMemory = numOfPartitions > memForJoin ? memForJoin : numOfPartitions;
+        probeBufferManager = new VPartitionTupleBufferManager(ctx, getPartitionMemoryConstrain(), numOfPartitions,
+                (probeMemory) * ctx.getInitialFrameSize());
+
+        probeRFWriters = new RunFileWriter[numOfPartitions];
+    }
+
+    public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        accessorProbe.reset(buffer);
+        int tupleCount = accessorProbe.getTupleCount();
+
+        for (int i = 0; i < tupleCount; ++i) {
+            int pid = probeHpc.partition(accessorProbe, i, k);
+            if (!ipjd.hasProbeJoinMap(pid)) {
+                // Set probe join map
+                ipjd.setProbeJoinMap(pid,
+                        IntervalPartitionUtil.getProbeJoinPartitions(pid, ipjd.buildPSizeInTups, imjc, k));
+            }
+
+            // Tuple has potential match from build phase
+            if (!ipjd.isProbeJoinMapEmpty(pid)) {
+                if (ipjd.probeHasSpilled(pid)) {
+                    // pid is Spilled
+                    while (!probeBufferManager.insertTuple(pid, accessorProbe, i, tempPtr)) {
+                        int victim = pid;
+                        if (probeBufferManager.getNumTuples(pid) == 0) {
+                            // current pid is empty, choose the biggest one
+                            victim = selectLargestSpilledPartition();
+                        }
+                        if (victim < 0) {
+                            // current tuple is too big for all the free space
+                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
+                            break;
+                        }
+                        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
+                        probeBufferManager.flushPartition(victim, runFileWriter);
+                        probeBufferManager.clearPartition(victim);
+                    }
+                }
+                for (Iterator<Integer> pidIterator = ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
+                    // pid has join partitions that are Resident
+                    int j = pidIterator.next();
+                    if (inMemJoiner[j] != null) {
+                        inMemJoiner[j].join(accessorProbe, i, writer);
+                    }
+                }
+            }
+            ipjd.probeIncrementCount(pid);
+        }
+    }
+
+    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
+        // We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+        for (int i = 0; i < inMemJoiner.length; ++i) {
+            if (inMemJoiner[i] != null) {
+                closeInMemoryJoiner(i, writer);
+                ipjd.buildLogJoined(i);
+            }
+        }
+        clearBuildMemory();
+        flushAndClearProbeSpilledPartition();
+        probeBufferManager.close();
+        probeBufferManager = null;
+    }
+
+    private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
+            throws HyracksDataException {
+        if (bigProbeFrameAppender == null) {
+            bigProbeFrameAppender = new FrameTupleAppender(new VSizeFrame(ctx));
+        }
+        RunFileWriter runFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
+        if (!bigProbeFrameAppender.append(accessorProbe, i)) {
+            throw new HyracksDataException("The given tuple is too big");
+        }
+        bigProbeFrameAppender.write(runFileWriter, true);
+    }
+
+    public RunFileReader getBuildRFReader(int pid) throws HyracksDataException {
+        return (buildRFWriters[pid] == null) ? null : (buildRFWriters[pid]).createReader();
+    }
+
+    public RunFileReader getProbeRFReader(int pid) throws HyracksDataException {
+        return (probeRFWriters[pid] == null) ? null : (probeRFWriters[pid]).createReader();
+    }
+
+    public void joinSpilledPartitions(IFrameWriter writer) throws HyracksDataException {
+        LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
+        if (reloadBuffer == null) {
+            reloadBuffer = new VSizeFrame(ctx);
+        }
+        HashSet<Integer> inMemory = new HashSet<>();
+        while (ipjd.buildGetSpilledCount() > 0) {
+            // Load back spilled build partitions.
+            // TODO only load partition required for spill join. Consider both sides.
+            bringBackSpilledPartitionIfHasMoreMemory();
+
+            probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
+
+            // Create in memory joiners.
+            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
+                    .buildNextInMemoryWithResults(pid + 1)) {
+                createInMemoryJoiner(pid);
+                inMemory.add(pid);
+            }
+
+            // Join all build partitions with disk probe partitions.
+            for (Entry<Integer, LinkedHashSet<Integer>> entry : probeInMemoryJoinMap.entrySet()) {
+                if (ipjd.probeGetCount(entry.getKey()) > 0 && probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
+                    RunFileReader pReader = getProbeRFReader(entry.getKey());
+                    pReader.open();
+                    while (pReader.nextFrame(reloadBuffer)) {
+                        accessorProbe.reset(reloadBuffer.getBuffer());
+                        for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
+                            // Tuple has potential match from build phase
+                            for (Integer j : probeInMemoryJoinMap.get(entry.getKey())) {
+                                // j has join partitions that are Resident
+                                if (inMemJoiner[j] != null) {
+                                    inMemJoiner[j].join(accessorProbe, i, writer);
+                                }
+                            }
+                        }
+                    }
+                    pReader.close();
+                }
+            }
+
+            // Clean up build memory.
+            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid = ipjd
+                    .buildNextInMemoryWithResults(pid + 1)) {
+                closeInMemoryJoiner(pid, writer);
+                ipjd.buildLogJoined(pid);
+            }
+            inMemory.clear();
+            clearBuildMemory();
+        }
+    }
+
+    class IntervalPartitionJoinData {
+        private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
+
+        private int[] buildPSizeInTups;
+        private int[] probePSizeInTups;
+
+        private BitSet buildJoinedCompleted; //0=waiting, 1=joined
+        private BitSet buildSpilledStatus; //0=resident, 1=spilled
+        private BitSet buildInMemoryStatus; //0=unknown, 1=resident
+        private BitSet probeSpilledStatus; //0=resident, 1=spilled
+
+        public IntervalPartitionJoinData(int k, IIntervalMergeJoinChecker imjc, int numberOfPartitions) {
+            probeJoinMap = new LinkedHashMap<>();
+
+            buildPSizeInTups = new int[numberOfPartitions];
+            probePSizeInTups = new int[numberOfPartitions];
+
+            buildJoinedCompleted = new BitSet(numberOfPartitions);
+            buildInMemoryStatus = new BitSet(numberOfPartitions);
+            buildSpilledStatus = new BitSet(numberOfPartitions);
+            probeSpilledStatus = new BitSet(numberOfPartitions);
+        }
+
+        public LinkedHashMap<Integer, LinkedHashSet<Integer>> probeGetInMemoryJoinMap() {
+            return IntervalPartitionUtil.getInMemorySpillJoinMap(probeJoinMap, buildInMemoryStatus, probeSpilledStatus);
+        }
+
+        public boolean hasProbeJoinMap(int pid) {
+            return probeJoinMap.containsKey(pid);
+        }
+
+        public boolean isProbeJoinMapEmpty(int pid) {
+            return probeJoinMap.get(pid).isEmpty();
+        }
+
+        public Iterator<Integer> getProbeJoinMap(int pid) {
+            return probeJoinMap.get(pid).iterator();
+        }
+
+        public void setProbeJoinMap(int pid, LinkedHashSet<Integer> map) {
+            probeJoinMap.put(new Integer(pid), map);
+            for (Integer i : map) {
+                if (buildIsSpilled(i)) {
+                    // Build join partition has spilled. Now spill the probe also.
+                    probeSpilledStatus.set(pid);
+                }
+            }
+        }
+
+        public void buildIncrementCount(int pid) {
+            buildInMemoryStatus.set(pid);
+            buildPSizeInTups[pid]++;
+        }
+
+        public int buildGetCount(int pid) {
+            return buildPSizeInTups[pid];
+        }
+
+        public void buildLogJoined(int pid) {
+            buildSpilledStatus.clear(pid);
+            buildJoinedCompleted.set(pid);
+        }
+
+        public void buildRemoveFromJoin(int pid) {
+            buildSpilledStatus.clear(pid);
+            buildJoinedCompleted.set(pid);
+        }
+
+        public boolean buildHasBeenJoined(int pid) {
+            return buildJoinedCompleted.get(pid);
+        }
+
+        public int buildGetSpilledCount() {
+            return buildSpilledStatus.cardinality();
+        }
+
+        public void buildSpill(int pid) {
+            buildInMemoryStatus.clear(pid);
+            buildSpilledStatus.set(pid);
+        }
+
+        public void buildLoad(int pid) {
+            buildInMemoryStatus.set(pid);
+            buildSpilledStatus.clear(pid);
+        }
+
+        public boolean buildIsSpilled(int pid) {
+            return buildSpilledStatus.get(pid);
+        }
+
+        public int buildNextSpilled(int pid) {
+            return buildSpilledStatus.nextSetBit(pid);
+        }
+
+        public int buildNextInMemoryWithResults(int pid) {
+            int nextPid = buildNextInMemory(pid);
+            do {
+                if (nextPid < 0 || buildGetCount(nextPid) > 0) {
+                    return nextPid;
+                }
+                nextPid = buildNextInMemory(nextPid + 1);
+            } while (nextPid >= 0);
+            return -1;
+        }
+
+        public int buildNextInMemory(int pid) {
+            int nextPid =  buildSpilledStatus.nextClearBit(pid);
+            if (nextPid >= numOfPartitions) {
+                return -1;
+            }
+            do {
+                if (!buildHasBeenJoined(nextPid)) {
+                    return nextPid;
+                }
+                nextPid = buildSpilledStatus.nextClearBit(nextPid + 1);
+            } while (nextPid >= 0 && nextPid < numOfPartitions);
+            return -1;
+        }
+
+        public void probeIncrementCount(int pid) {
+            probePSizeInTups[pid]++;
+        }
+
+        public int probeGetCount(int pid) {
+            return probePSizeInTups[pid];
+        }
+
+        public void probeSpill(int pid) {
+            probeSpilledStatus.set(pid);
+        }
+
+        public boolean probeHasSpilled(int pid) {
+            return probeSpilledStatus.get(pid);
+        }
+
+        public int buildGetMaxPartitionSize() {
+            int max = buildPSizeInTups[0];
+            for (int i = 1; i < buildPSizeInTups.length; i++) {
+                if (buildPSizeInTups[i] > max) {
+                    max = buildPSizeInTups[i];
+                }
+            }
+            return max;
+        }
+
+        public int probeGetMaxPartitionSize() {
+            int max = probePSizeInTups[0];
+            for (int i = 1; i < probePSizeInTups.length; i++) {
+                if (probePSizeInTups[i] > max) {
+                    max = probePSizeInTups[i];
+                }
+            }
+            return max;
+        }
+
+    }
+
+    public void closeAndDeleteRunFiles() throws HyracksDataException {
+        for (RunFileWriter rfw : buildRFWriters) {
+            if (rfw != null) {
+                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
+            }
+        }
+        for (RunFileWriter rfw : probeRFWriters) {
+            if (rfw != null) {
+                FileUtils.deleteQuietly(rfw.getFileReference().getFile());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index e05c06e..c6e95e1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -37,15 +37,15 @@ public class IntervalPartitionUtil {
     private IntervalPartitionUtil() {
     }
 
-    public static int determineK(int countR, int maxDurationR, int countS, int maxDurationS, int avgTuplePerFrame) {
+    public static int determineK(long countR, long maxDurationR, long countS, long maxDurationS, int avgTuplePerFrame) {
         double deltaR = 1.0 / maxDurationR;
         double deltaS = 1.0 / maxDurationS;
 
-        int knMinusTwo = 0;
-        int knMinusOne = 0;
-        int kn = 1;
+        long knMinusTwo = 0;
+        long knMinusOne = 0;
+        long kn = 1;
 
-        int prn = determinePn(kn, countR, deltaR);
+        long prn = determinePn(kn, countR, deltaR);
         double tn = determineTn(kn, determinePn(kn, countS, deltaS));
 
         while ((kn != knMinusOne) && (kn != knMinusTwo)) {
@@ -55,21 +55,25 @@ public class IntervalPartitionUtil {
             prn = determinePn(kn, countR, deltaR);
             tn = determineTn(kn, determinePn(kn, countS, deltaS));
         }
-        return kn;
+        if (kn > Integer.MAX_VALUE) {
+            return Integer.MAX_VALUE;
+        } else {
+            return (int) kn;
+        }
     }
 
-    public static int determineKn(int countR, int countS, int avgTuplePerFrame, int prn, double tn) {
+    public static long determineKn(long countR, long countS, int avgTuplePerFrame, long prn, double tn) {
         double factorS = (3.0 * countS) / (2 * (C_IO + 2 * C_CPU) * tn);
         double factorR = (C_IO / avgTuplePerFrame) + ((4.0 * countR * C_CPU) / prn);
-        return (int) Math.cbrt(factorS * factorR);
+        return (long) Math.cbrt(factorS * factorR);
     }
 
-    public static int determinePn(int kn, int count, double delta) {
+    public static long determinePn(long kn, long count, double delta) {
         long knDelta = (long) Math.ceil(kn * delta);
-        return Math.min((int) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count);
+        return Math.min((long) ((kn * knDelta) + kn - ((knDelta * knDelta) / 2.0) - (knDelta / 2.0)), count);
     }
 
-    public static double determineTn(int kn, int pn) {
+    public static double determineTn(long kn, long pn) {
         return pn / ((kn * kn + kn) / 2.0);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
new file mode 100644
index 0000000..de02572
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractExpressionAnnotation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.expressions;
+
+public abstract class AbstractExpressionAnnotation implements IExpressionAnnotation {
+
+    protected Object object;
+
+    @Override
+    public Object getObject() {
+        return object;
+    }
+
+    @Override
+    public void setObject(Object object) {
+        this.object = object;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
index 140dfb1..3aa34c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionAnnotationNoCopyImpl.java
@@ -18,23 +18,11 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.expressions;
 
-public class ExpressionAnnotationNoCopyImpl implements IExpressionAnnotation {
-
-    private Object object;
+public class ExpressionAnnotationNoCopyImpl extends AbstractExpressionAnnotation {
 
     @Override
     public IExpressionAnnotation copy() {
         return this;
     }
 
-    @Override
-    public Object getObject() {
-        return object;
-    }
-
-    @Override
-    public void setObject(Object object) {
-        this.object = object;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fd84e345/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
index 5ee6b07..91c0a8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/IndexedNLJoinExpressionAnnotation.java
@@ -18,23 +18,11 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.expressions;
 
-public class IndexedNLJoinExpressionAnnotation implements IExpressionAnnotation {
+public class IndexedNLJoinExpressionAnnotation extends AbstractExpressionAnnotation {
 
     public static final String HINT_STRING = "indexnl";
     public static final IndexedNLJoinExpressionAnnotation INSTANCE = new IndexedNLJoinExpressionAnnotation();
 
-    private Object object;
-
-    @Override
-    public Object getObject() {
-        return object;
-    }
-
-    @Override
-    public void setObject(Object object) {
-        this.object = object;
-    }
-
     @Override
     public IExpressionAnnotation copy() {
         IndexedNLJoinExpressionAnnotation clone = new IndexedNLJoinExpressionAnnotation();