You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/05/19 15:38:28 UTC

[2/4] git commit: updated refs/heads/trunk to 4321e44

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntFloatMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntFloatMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntFloatMinHeap.java
new file mode 100644
index 0000000..f1a7253
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntFloatMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.ints.AbstractInt2FloatMap;
+import it.unimi.dsi.fastutil.ints.Int2FloatMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.IntFloatConsumer;
+import org.apache.giraph.function.primitive.pairs.IntFloatPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (int key, float value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityIntFloatMinHeap
+    implements Int2FloatMapEntryIterable {
+  /** Keys in the heap */
+  private final int[] keys;
+  /** Values in the heap */
+  private final float[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityIntFloatMinHeap(int capacity) {
+    keys = new int[capacity];
+    values = new float[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(int key, float value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public int getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public float getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(int key1, float value1,
+      int key2, float value2) {
+    int t = Float.compare(value1, value2);
+    return (t == 0) ? Integer.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Int2FloatMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityIntFloatMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeInt(heap.keys[i]);
+      out.writeFloat(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityIntFloatMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityIntFloatMinHeap read(
+      FixedCapacityIntFloatMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityIntFloatMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readInt();
+      heap.values[i] = in.readFloat();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(int key, float value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachIntFloat(IntFloatConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileIntFloat(IntFloatPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityIntFloatMinHeap */
+  private class IteratorImpl implements ObjectIterator<Int2FloatMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Int2FloatMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setIntKey(keys[index]);
+      entry.setFloatValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractInt2FloatMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setIntKey(int key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setFloatValue(float value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntIntMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntIntMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntIntMinHeap.java
new file mode 100644
index 0000000..a6865f1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntIntMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.ints.AbstractInt2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.IntIntConsumer;
+import org.apache.giraph.function.primitive.pairs.IntIntPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (int key, int value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityIntIntMinHeap
+    implements Int2IntMapEntryIterable {
+  /** Keys in the heap */
+  private final int[] keys;
+  /** Values in the heap */
+  private final int[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityIntIntMinHeap(int capacity) {
+    keys = new int[capacity];
+    values = new int[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(int key, int value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public int getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public int getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(int key1, int value1,
+      int key2, int value2) {
+    int t = Integer.compare(value1, value2);
+    return (t == 0) ? Integer.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Int2IntMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityIntIntMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeInt(heap.keys[i]);
+      out.writeInt(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityIntIntMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityIntIntMinHeap read(
+      FixedCapacityIntIntMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityIntIntMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readInt();
+      heap.values[i] = in.readInt();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(int key, int value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachIntInt(IntIntConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileIntInt(IntIntPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityIntIntMinHeap */
+  private class IteratorImpl implements ObjectIterator<Int2IntMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Int2IntMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setIntKey(keys[index]);
+      entry.setIntValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractInt2IntMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setIntKey(int key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setIntValue(int value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntLongMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntLongMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntLongMinHeap.java
new file mode 100644
index 0000000..b07c5b2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityIntLongMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.ints.AbstractInt2LongMap;
+import it.unimi.dsi.fastutil.ints.Int2LongMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.IntLongConsumer;
+import org.apache.giraph.function.primitive.pairs.IntLongPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (int key, long value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityIntLongMinHeap
+    implements Int2LongMapEntryIterable {
+  /** Keys in the heap */
+  private final int[] keys;
+  /** Values in the heap */
+  private final long[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityIntLongMinHeap(int capacity) {
+    keys = new int[capacity];
+    values = new long[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(int key, long value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public int getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(int key1, long value1,
+      int key2, long value2) {
+    int t = Long.compare(value1, value2);
+    return (t == 0) ? Integer.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Int2LongMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityIntLongMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeInt(heap.keys[i]);
+      out.writeLong(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityIntLongMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityIntLongMinHeap read(
+      FixedCapacityIntLongMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityIntLongMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readInt();
+      heap.values[i] = in.readLong();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(int key, long value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachIntLong(IntLongConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileIntLong(IntLongPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityIntLongMinHeap */
+  private class IteratorImpl implements ObjectIterator<Int2LongMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Int2LongMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setIntKey(keys[index]);
+      entry.setLongValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractInt2LongMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setIntKey(int key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setLongValue(long value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongByteMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongByteMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongByteMinHeap.java
new file mode 100644
index 0000000..106fd71
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongByteMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.longs.AbstractLong2ByteMap;
+import it.unimi.dsi.fastutil.longs.Long2ByteMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.LongByteConsumer;
+import org.apache.giraph.function.primitive.pairs.LongBytePredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (long key, byte value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityLongByteMinHeap
+    implements Long2ByteMapEntryIterable {
+  /** Keys in the heap */
+  private final long[] keys;
+  /** Values in the heap */
+  private final byte[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityLongByteMinHeap(int capacity) {
+    keys = new long[capacity];
+    values = new byte[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(long key, byte value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public byte getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(long key1, byte value1,
+      long key2, byte value2) {
+    int t = Byte.compare(value1, value2);
+    return (t == 0) ? Long.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Long2ByteMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityLongByteMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeLong(heap.keys[i]);
+      out.writeByte(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityLongByteMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityLongByteMinHeap read(
+      FixedCapacityLongByteMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityLongByteMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readLong();
+      heap.values[i] = in.readByte();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(long key, byte value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachLongByte(LongByteConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileLongByte(LongBytePredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityLongByteMinHeap */
+  private class IteratorImpl implements ObjectIterator<Long2ByteMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Long2ByteMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setLongKey(keys[index]);
+      entry.setByteValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractLong2ByteMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, (byte) 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setLongKey(long key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setByteValue(byte value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongDoubleMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongDoubleMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongDoubleMinHeap.java
new file mode 100644
index 0000000..90a5455
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongDoubleMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.longs.AbstractLong2DoubleMap;
+import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.LongDoubleConsumer;
+import org.apache.giraph.function.primitive.pairs.LongDoublePredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (long key, double value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityLongDoubleMinHeap
+    implements Long2DoubleMapEntryIterable {
+  /** Keys in the heap */
+  private final long[] keys;
+  /** Values in the heap */
+  private final double[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityLongDoubleMinHeap(int capacity) {
+    keys = new long[capacity];
+    values = new double[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(long key, double value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public double getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(long key1, double value1,
+      long key2, double value2) {
+    int t = Double.compare(value1, value2);
+    return (t == 0) ? Long.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Long2DoubleMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityLongDoubleMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeLong(heap.keys[i]);
+      out.writeDouble(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityLongDoubleMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityLongDoubleMinHeap read(
+      FixedCapacityLongDoubleMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityLongDoubleMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readLong();
+      heap.values[i] = in.readDouble();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(long key, double value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachLongDouble(LongDoubleConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileLongDouble(LongDoublePredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityLongDoubleMinHeap */
+  private class IteratorImpl implements ObjectIterator<Long2DoubleMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Long2DoubleMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setLongKey(keys[index]);
+      entry.setDoubleValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractLong2DoubleMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setLongKey(long key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setDoubleValue(double value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongFloatMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongFloatMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongFloatMinHeap.java
new file mode 100644
index 0000000..2be0d24
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongFloatMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.longs.AbstractLong2FloatMap;
+import it.unimi.dsi.fastutil.longs.Long2FloatMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.LongFloatConsumer;
+import org.apache.giraph.function.primitive.pairs.LongFloatPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (long key, float value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityLongFloatMinHeap
+    implements Long2FloatMapEntryIterable {
+  /** Keys in the heap */
+  private final long[] keys;
+  /** Values in the heap */
+  private final float[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityLongFloatMinHeap(int capacity) {
+    keys = new long[capacity];
+    values = new float[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(long key, float value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public float getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(long key1, float value1,
+      long key2, float value2) {
+    int t = Float.compare(value1, value2);
+    return (t == 0) ? Long.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Long2FloatMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityLongFloatMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeLong(heap.keys[i]);
+      out.writeFloat(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityLongFloatMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityLongFloatMinHeap read(
+      FixedCapacityLongFloatMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityLongFloatMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readLong();
+      heap.values[i] = in.readFloat();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(long key, float value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachLongFloat(LongFloatConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileLongFloat(LongFloatPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityLongFloatMinHeap */
+  private class IteratorImpl implements ObjectIterator<Long2FloatMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Long2FloatMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setLongKey(keys[index]);
+      entry.setFloatValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractLong2FloatMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setLongKey(long key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setFloatValue(float value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongIntMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongIntMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongIntMinHeap.java
new file mode 100644
index 0000000..5bbecfe
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongIntMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.longs.AbstractLong2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.LongIntConsumer;
+import org.apache.giraph.function.primitive.pairs.LongIntPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (long key, int value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityLongIntMinHeap
+    implements Long2IntMapEntryIterable {
+  /** Keys in the heap */
+  private final long[] keys;
+  /** Values in the heap */
+  private final int[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityLongIntMinHeap(int capacity) {
+    keys = new long[capacity];
+    values = new int[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(long key, int value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public int getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(long key1, int value1,
+      long key2, int value2) {
+    int t = Integer.compare(value1, value2);
+    return (t == 0) ? Long.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Long2IntMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityLongIntMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeLong(heap.keys[i]);
+      out.writeInt(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityLongIntMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityLongIntMinHeap read(
+      FixedCapacityLongIntMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityLongIntMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readLong();
+      heap.values[i] = in.readInt();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(long key, int value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachLongInt(LongIntConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileLongInt(LongIntPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityLongIntMinHeap */
+  private class IteratorImpl implements ObjectIterator<Long2IntMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Long2IntMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setLongKey(keys[index]);
+      entry.setIntValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractLong2IntMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setLongKey(long key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setIntValue(int value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongLongMinHeap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongLongMinHeap.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongLongMinHeap.java
new file mode 100644
index 0000000..7db14a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/FixedCapacityLongLongMinHeap.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.longs.AbstractLong2LongMap;
+import it.unimi.dsi.fastutil.longs.Long2LongMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.giraph.function.primitive.pairs.LongLongConsumer;
+import org.apache.giraph.function.primitive.pairs.LongLongPredicate;
+
+// AUTO-GENERATED class via class:
+// org.apache.giraph.generate.GeneratePrimitiveClasses
+
+/**
+ * Min heap which holds (long key, long value) pairs with
+ * the largest values as its elements, up to the given maximum number of
+ * elements.
+ *
+ * When multiple elements with same values are added and there is no space for
+ * all of them in the heap, the one with larger keys will be kept in the heap.
+ *
+ * You can remove a pair with the minimum value currently in the heap.
+ */
+public class FixedCapacityLongLongMinHeap
+    implements Long2LongMapEntryIterable {
+  /** Keys in the heap */
+  private final long[] keys;
+  /** Values in the heap */
+  private final long[] values;
+  /** Number of elements currently in the heap */
+  private int size;
+  /** Capacity of the heap */
+  private final int capacity;
+  /** Reusable iterator instance */
+  private final IteratorImpl iterator;
+
+  /**
+   * Initialize the heap with desired capacity
+   *
+   * @param capacity Capacity
+   */
+  public FixedCapacityLongLongMinHeap(int capacity) {
+    keys = new long[capacity];
+    values = new long[capacity];
+    size = 0;
+    this.capacity = capacity;
+    iterator = new IteratorImpl();
+  }
+
+  /** Clear the heap */
+  public void clear() {
+    size = 0;
+  }
+
+  /**
+   * Add a key value pair
+   *
+   * @param key   Key
+   * @param value Value
+   */
+  public void add(long key, long value) {
+    if (size == capacity && compare(keys[0], values[0], key, value) >= 0) {
+      // If the heap is full and smallest element in it is not smaller
+      // than value, do nothing
+      return;
+    }
+    int position;
+    if (size < capacity) {
+      // If the heap is not full, increase its size and find the position for
+      // new element (up-heap search)
+      position = size;
+      size++;
+      while (position > 0) {
+        int parent = (position - 1) >> 1;
+        if (compare(keys[parent], values[parent], key, value) < 0) {
+          break;
+        }
+        values[position] = values[parent];
+        keys[position] = keys[parent];
+        position = parent;
+      }
+    } else {
+      // If the heap is full, remove element from the root and find the position
+      // for new element (down-heap search)
+      position = removeRootAndFindPosition(key, value);
+    }
+    // Fill position with key value pair
+    keys[position] = key;
+    values[position] = value;
+  }
+
+  /**
+   * @return Key corresponding to the minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinKey() {
+    if (size() > 0) {
+      return keys[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * @return Minimum value currently in the heap
+   * @throws NoSuchElementException if the heap is empty.
+   */
+  public long getMinValue() {
+    if (size() > 0) {
+      return values[0];
+    } else {
+      throw new NoSuchElementException();
+    }
+  }
+
+  /**
+   * Removes the (key, value) pair that corresponds to the minimum value
+   * currently in the heap.
+   */
+  public void removeMin() {
+    if (size() > 0) {
+      size--;
+      int position = removeRootAndFindPosition(keys[size], values[size]);
+      keys[position] = keys[size];
+      values[position] = values[size];
+    }
+  }
+
+  /**
+   * Comapre two (key, value) entries
+   *
+   * @param key1   First key
+   * @param value1 First value
+   * @param key2   Second key
+   * @param value2 Second value
+   * @return 0 if entries are equal, < 0 if first entry is smaller than the
+   * second one, and > 0 if first entry is larger than the second one
+   */
+  protected int compare(long key1, long value1,
+      long key2, long value2) {
+    int t = Long.compare(value1, value2);
+    return (t == 0) ? Long.compare(key1, key2) : t;
+  }
+
+  @Override
+  public ObjectIterator<Long2LongMap.Entry> iterator() {
+    iterator.reset();
+    return iterator;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Check if the heap is empty
+   *
+   * @return True iff the heap is empty
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Get capacity of the heap
+   *
+   * @return Heap capacity
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Serializes an object into data output.
+   *
+   * @param heap Object instance to serialize
+   * @param out  Data output
+   * @throws java.io.IOException
+   */
+  public static void write(FixedCapacityLongLongMinHeap heap,
+      DataOutput out) throws IOException {
+    out.writeInt(heap.capacity);
+    out.writeInt(heap.size);
+    for (int i = 0; i < heap.size(); i++) {
+      out.writeLong(heap.keys[i]);
+      out.writeLong(heap.values[i]);
+    }
+  }
+
+  /**
+   * Deserializes an object from data input.
+   *
+   * @param heap Object to reuse if possible
+   * @param in   Data input
+   * @return FixedCapacityLongLongMinHeap deserialized from data input.
+   * @throws IOException
+   */
+  public static FixedCapacityLongLongMinHeap read(
+      FixedCapacityLongLongMinHeap heap, DataInput in)
+      throws IOException {
+    int capacity = in.readInt();
+    if (heap == null || heap.capacity != capacity) {
+      heap = new FixedCapacityLongLongMinHeap(capacity);
+    } else {
+      heap.clear();
+    }
+    heap.size = in.readInt();
+    for (int i = 0; i < heap.size; i++) {
+      heap.keys[i] = in.readLong();
+      heap.values[i] = in.readLong();
+    }
+    return heap;
+  }
+
+  /**
+   * Takes a (key, value) pair, removes the root of the heap, and finds
+   * a position where the pair can be inserted.
+   *
+   * @param key   Key
+   * @param value Value
+   * @return Position in the heap where the (key, value) pair can be inserted
+   * while preserving the heap property.
+   */
+  private int removeRootAndFindPosition(long key, long value) {
+    int position = 0;
+    while (position < size) {
+      // Find the left child
+      int minChild = (position << 1) + 1;
+      // Compare the left and the right child values - find the smaller one
+      if (minChild + 1 < size &&
+          compare(keys[minChild + 1], values[minChild + 1],
+              keys[minChild], values[minChild]) < 0) {
+        minChild++;
+      }
+      if (minChild >= size || compare(keys[minChild], values[minChild],
+          key, value) >= 0) {
+        break;
+      }
+      keys[position] = keys[minChild];
+      values[position] = values[minChild];
+      position = minChild;
+    }
+    return position;
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element.
+   *
+   * @param f Function to call on each element.
+   */
+  public void forEachLongLong(LongLongConsumer f) {
+    for (int i = 0; i < size(); ++i) {
+      f.apply(keys[i], values[i]);
+    }
+  }
+
+  /**
+   * Traverse all elements of the heap, calling given function on each element,
+   * or until predicate returns false.
+   *
+   * @param f Function to call on each element.
+   * @return true if the predicate returned true for all elements,
+   *    false if it returned false for some element.
+   */
+  public boolean forEachWhileLongLong(LongLongPredicate f) {
+    for (int i = 0; i < size(); ++i) {
+      if (!f.apply(keys[i], values[i])) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Iterator for FixedCapacityLongLongMinHeap */
+  private class IteratorImpl implements ObjectIterator<Long2LongMap.Entry> {
+    /** Reusable entry */
+    private final MutableEntry entry = new MutableEntry();
+    /** Current index */
+    private int index;
+
+    /** Reset the iterator so it can be reused */
+    public void reset() {
+      index = -1;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return index < size - 1;
+    }
+
+    @Override
+    public Long2LongMap.Entry next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      index++;
+      entry.setLongKey(keys[index]);
+      entry.setLongValue(values[index]);
+      return entry;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove() shouldn't be called");
+    }
+
+    @Override
+    public int skip(int i) {
+      throw new UnsupportedOperationException("skip(int) shouldn't be called");
+    }
+  }
+
+  /** Helper mutable Entry class */
+  private static class MutableEntry extends AbstractLong2LongMap.BasicEntry {
+    /** Default constructor */
+    private MutableEntry() {
+      super(0, 0);
+    }
+
+    /**
+     * Set key
+     *
+     * @param key Key to set
+     */
+    private void setLongKey(long key) {
+      this.key = key;
+    }
+
+    /**
+     * Set value
+     *
+     * @param value Value to set
+     */
+    private void setLongValue(long value) {
+      this.value = value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4321e448/giraph-core/src/main/java/org/apache/giraph/types/heaps/Int2ByteMapEntryIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/heaps/Int2ByteMapEntryIterable.java b/giraph-core/src/main/java/org/apache/giraph/types/heaps/Int2ByteMapEntryIterable.java
new file mode 100644
index 0000000..d37cc5b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/heaps/Int2ByteMapEntryIterable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.types.heaps;
+
+import it.unimi.dsi.fastutil.ints.Int2ByteMap;;
+import it.unimi.dsi.fastutil.objects.ObjectIterable;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+/**
+ * Iterable which has its size and ObjectIterator<Int2ByteMap.Entry>
+ */
+public interface Int2ByteMapEntryIterable
+    extends ObjectIterable<Int2ByteMap.Entry> {
+  /**
+   * Get the iterator. Not thread-safe and reuses iterator object,
+   * so you can't have several iterators at the same time.
+   *
+   * @return Iterator
+   */
+  ObjectIterator<Int2ByteMap.Entry> iterator();
+
+  /**
+   * Get the size of this iterable
+   *
+   * @return Size
+   */
+  int size();
+}