You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/17 10:50:42 UTC

svn commit: r1203130 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ src/test...

Author: aching
Date: Thu Nov 17 09:50:41 2011
New Revision: 1203130

URL: http://svn.apache.org/viewvc?rev=1203130&view=rev
Log:
GIRAPH-91: Large-memory improvements (Memory reduced vertex
implementation, fast failure, added settings). (aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/
    incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Nov 17 09:50:41 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-91: Large-memory improvements (Memory reduced vertex
+  implementation, fast failure, added settings). (aching)
+
   GIRAPH-89: Remove debugging system.out from LongDoubleFloatDoubleVertex. 
   (shaunak via aching)
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Nov 17 09:50:41 2011
@@ -23,6 +23,7 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
@@ -36,33 +37,60 @@ import java.util.Iterator;
 /**
  * Benchmark based on the basic Pregel PageRank implementation.
  */
-public class PageRankBenchmark extends
-        Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable>
-        implements Tool {
+public class PageRankBenchmark implements Tool {
     /** Configuration from Configurable */
     private Configuration conf;
 
     /** How many supersteps to run */
     public static String SUPERSTEP_COUNT = "PageRankBenchmark.superstepCount";
 
-    @Override
-    public void compute(Iterator<DoubleWritable> msgIterator) {
-        if (getSuperstep() >= 1) {
-            double sum = 0;
-            while (msgIterator.hasNext()) {
-                sum += msgIterator.next().get();
+    public static class PageRankVertex extends Vertex<
+            LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+        @Override
+        public void compute(Iterator<DoubleWritable> msgIterator) {
+            if (getSuperstep() >= 1) {
+                double sum = 0;
+                while (msgIterator.hasNext()) {
+                    sum += msgIterator.next().get();
+                }
+                DoubleWritable vertexValue =
+                    new DoubleWritable((0.15f / getNumVertices()) + 0.85f *
+                                       sum);
+                setVertexValue(vertexValue);
+            }
+
+            if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
+                long edges = getNumOutEdges();
+                sendMsgToAllEdges(
+                    new DoubleWritable(getVertexValue().get() / edges));
+            } else {
+                voteToHalt();
             }
-            DoubleWritable vertexValue =
-                new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
-            setVertexValue(vertexValue);
         }
+    }
 
-        if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
-            long edges = getNumOutEdges();
-            sendMsgToAllEdges(
-                new DoubleWritable(getVertexValue().get() / edges));
-        } else {
-            voteToHalt();
+    public static class PageRankEdgeListVertex extends EdgeListVertex<
+            LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+        @Override
+        public void compute(Iterator<DoubleWritable> msgIterator) {
+            if (getSuperstep() >= 1) {
+                double sum = 0;
+                while (msgIterator.hasNext()) {
+                    sum += msgIterator.next().get();
+                }
+                DoubleWritable vertexValue =
+                    new DoubleWritable((0.15f / getNumVertices()) + 0.85f *
+                                       sum);
+                setVertexValue(vertexValue);
+            }
+
+            if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
+                long edges = getNumOutEdges();
+                sendMsgToAllEdges(
+                        new DoubleWritable(getVertexValue().get() / edges));
+            } else {
+                voteToHalt();
+            }
         }
     }
 
@@ -97,6 +125,10 @@ public class PageRankBenchmark extends
                           "edgesPerVertex",
                           true,
                           "Edges per vertex");
+        options.addOption("c",
+                          "vertexClass",
+                          true,
+                          "Vertex class (0 for Vertex, 1 for EdgeListVertex)");
         HelpFormatter formatter = new HelpFormatter();
         if (args.length == 0) {
             formatter.printHelp(getClass().getName(), options, true);
@@ -125,9 +157,19 @@ public class PageRankBenchmark extends
                                "per vertex (-e)");
             return -1;
         }
+
         int workers = Integer.parseInt(cmd.getOptionValue('w'));
         GiraphJob job = new GiraphJob(getConf(), getClass().getName());
-        job.setVertexClass(getClass());
+        if (!cmd.hasOption('c') ||
+                (Integer.parseInt(cmd.getOptionValue('c')) == 0)) {
+            System.out.println("Using " +
+                                PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+        } else {
+            System.out.println("Using " +
+                                PageRankEdgeListVertex.class.getName());
+            job.setVertexClass(PageRankEdgeListVertex.class);
+        }
         job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
         job.setWorkerConfiguration(workers, workers, 100.0f);
         job.getConfiguration().setLong(

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Nov 17 09:50:41 2011
@@ -153,4 +153,9 @@ public interface CentralizedServiceWorke
      * @return BspMapper
      */
     GraphMapper<I, V, E, M> getGraphMapper();
+
+    /**
+     * Operations that will be called if there is a failure by a worker.
+     */
+    void failureCleanup();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Nov 17 09:50:41 2011
@@ -851,6 +851,7 @@ end[HADOOP_FACEBOOK]*/
         for (Future<?> future : futures) {
             try {
                 future.get();
+                context.progress();
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Nov 17 09:50:41 2011
@@ -136,8 +136,9 @@ public class BspServiceWorker<
         commService = new RPCCommunications<I, V, E, M>(
             context, this, graphState);
         graphState.setWorkerCommunications(commService);
-        this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
-            graphMapper.getGraphState());
+        this.workerContext =
+            BspUtils.createWorkerContext(getConfiguration(),
+                                         graphMapper.getGraphState());
     }
 
     public WorkerContext getWorkerContext() {
@@ -710,6 +711,30 @@ public class BspServiceWorker<
         }
     }
 
+    /**
+     * Do this to help notify the master quicker that this worker has failed.
+     */
+    private void unregisterHealth() {
+        LOG.error("unregisterHealth: Got failure, unregistering health on " +
+                  myHealthZnode + " on superstep " + getSuperstep());
+        try {
+            getZkExt().delete(myHealthZnode, -1);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "unregisterHealth: InterruptedException - Couldn't delete " +
+                myHealthZnode, e);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "unregisterHealth: KeeperException - Couldn't delete " +
+                myHealthZnode, e);
+        }
+    }
+
+    @Override
+    public void failureCleanup() {
+        unregisterHealth();
+    }
+
     @Override
     public Collection<? extends PartitionOwner> startSuperstep() {
         // Algorithm:

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1203130&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Nov 17 09:50:41 2011
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * User applications can subclass {@link EdgeListVertex}, which stores
+ * the outbound edges in an ArrayList (less memory as the cost of expensive
+ * sorting and random-access lookup).  Good for static graphs.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeListVertex<I extends WritableComparable,
+        V extends Writable,
+        E extends Writable, M extends Writable>
+        extends MutableVertex<I, V, E, M> {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** List of the dest edge indices */
+    private List<I> destEdgeIndexList;
+    /** List of the dest edge values */
+    /** Map of destination vertices and their edge values */
+    private List<E> destEdgeValueList;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+
+    @Override
+    public void initialize(I vertexId, V vertexValue,
+                           Map<I, E> edges,
+                           List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        if (edges != null && !edges.isEmpty()) {
+            destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
+            destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
+            List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
+            Collections.sort(sortedIndexList, new VertexIdComparator());
+            for (I index : sortedIndexList) {
+                destEdgeIndexList.add(index);
+                destEdgeValueList.add(edges.get(index));
+            }
+            sortedIndexList.clear();
+        } else {
+            destEdgeIndexList = Lists.newArrayList();
+            destEdgeValueList = Lists.newArrayList();
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof EdgeListVertex) {
+            @SuppressWarnings("unchecked")
+            EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
+            if (!getVertexId().equals(otherVertex.getVertexId())) {
+                return false;
+            }
+            if (!getVertexValue().equals(otherVertex.getVertexValue())) {
+                return false;
+            }
+            if (!getMsgList().equals(otherVertex.getMsgList())) {
+                return false;
+            }
+            Iterator<I> iterator = iterator();
+            Iterator<I> otherIterator = otherVertex.iterator();
+            while (iterator.hasNext() && otherIterator.hasNext()) {
+                I index = iterator.next();
+                I otherIndex = otherIterator.next();
+                if (!(index == null ? otherIndex == null :
+                        index.equals(otherIndex))) {
+                    return false;
+                }
+            }
+            return !(iterator.hasNext() || otherIterator.hasNext());
+        }
+        return false;
+    }
+
+    /**
+     * Comparator for the vertex id
+     */
+    private class VertexIdComparator implements Comparator<I> {
+        @SuppressWarnings("unchecked")
+        @Override
+        public int compare(I index1, I index2) {
+            return index1.compareTo(index2);
+        }
+    }
+
+    @Override
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        int pos = Collections.binarySearch(destEdgeIndexList,
+                                           targetVertexId,
+                                           new VertexIdComparator());
+        if (pos == destEdgeIndexList.size() ||
+                !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+            destEdgeIndexList.add(pos, targetVertexId);
+            destEdgeValueList.add(pos, edgeValue);
+            return true;
+        } else {
+            LOG.warn("addEdge: Vertex=" + vertexId +
+                     ": already added an edge value for dest vertex id " +
+                     targetVertexId);
+            return false;
+        }
+    }
+
+    @Override
+    public long getSuperstep() {
+        return getGraphState().getSuperstep();
+    }
+
+    @Override
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+    }
+
+    @Override
+    public final I getVertexId() {
+        return vertexId;
+    }
+
+    @Override
+    public final V getVertexValue() {
+        return vertexValue;
+    }
+
+    @Override
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+    }
+
+    @Override
+    public E getEdgeValue(I targetVertexId) {
+        int pos = Collections.binarySearch(destEdgeIndexList,
+                targetVertexId,
+                new VertexIdComparator());
+        if (pos == destEdgeIndexList.size() ||
+                !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+            return null;
+        } else {
+            return destEdgeValueList.get(pos);
+        }
+    }
+
+    @Override
+    public boolean hasEdge(I targetVertexId) {
+        int pos = Collections.binarySearch(destEdgeIndexList,
+                targetVertexId,
+                new VertexIdComparator());
+        if (pos == destEdgeIndexList.size() ||
+                !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    /**
+     * Get an iterator to the edges on this vertex.
+     *
+     * @return A <em>sorted</em> iterator, as defined by the sort-order
+     *         of the vertex ids
+     */
+    @Override
+    public Iterator<I> iterator() {
+        return destEdgeIndexList.iterator();
+    }
+
+    @Override
+    public int getNumOutEdges() {
+        return destEdgeIndexList.size();
+    }
+
+    @Override
+    public E removeEdge(I targetVertexId) {
+        int pos = Collections.binarySearch(destEdgeIndexList,
+                targetVertexId,
+                new VertexIdComparator());
+        if (pos == destEdgeIndexList.size() ||
+                !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+            return null;
+        } else {
+            destEdgeIndexList.remove(pos);
+            return destEdgeValueList.remove(pos);
+        }
+    }
+
+    @Override
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException(
+                "sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (I index : destEdgeIndexList) {
+            sendMsg(index, msg);
+        }
+    }
+
+    @Override
+    public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
+            throws IOException {
+        getGraphState().getWorkerCommunications().
+            addVertexReq(vertex);
+    }
+
+    @Override
+    public void removeVertexRequest(I vertexId) throws IOException {
+        getGraphState().getWorkerCommunications().
+            removeVertexReq(vertexId);
+    }
+
+    @Override
+    public void addEdgeRequest(I vertexIndex,
+                               Edge<I, E> edge) throws IOException {
+        getGraphState().getWorkerCommunications().
+            addEdgeReq(vertexIndex, edge);
+    }
+
+    @Override
+    public void removeEdgeRequest(I sourceVertexId,
+                                  I destVertexId) throws IOException {
+        getGraphState().getWorkerCommunications().
+            removeEdgeReq(sourceVertexId, destVertexId);
+    }
+
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        vertexId = BspUtils.<I>createVertexIndex(getConf());
+        vertexId.readFields(in);
+        boolean hasVertexValue = in.readBoolean();
+        if (hasVertexValue) {
+            vertexValue = BspUtils.<V>createVertexValue(getConf());
+            vertexValue.readFields(in);
+        }
+        int edgeListCount = in.readInt();
+        destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
+        destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
+        for (int i = 0; i < edgeListCount; ++i) {
+            I vertexId = BspUtils.<I>createVertexIndex(getConf());
+            E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+            vertexId.readFields(in);
+            edgeValue.readFields(in);
+            destEdgeIndexList.add(vertexId);
+            destEdgeValueList.add(edgeValue);
+        }
+        int msgListSize = in.readInt();
+        for (int i = 0; i < msgListSize; ++i) {
+            M msg = BspUtils.<M>createMessageValue(getConf());
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+    }
+
+    @Override
+    final public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        out.writeInt(destEdgeIndexList.size());
+        for (int i = 0 ; i < destEdgeIndexList.size(); ++i) {
+            destEdgeIndexList.get(i).write(out);
+            destEdgeValueList.get(i).write(out);
+        }
+        out.writeInt(msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
+
+    @Override
+    public List<M> getMsgList() {
+        return msgList;
+    }
+
+    @Override
+    public String toString() {
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+            ",#edges=" + getNumOutEdges() + ")";
+    }
+}
+

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Nov 17 09:50:41 2011
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.BspInputFor
 import org.apache.giraph.bsp.BspOutputFormat;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
@@ -261,7 +262,7 @@ public class GiraphJob extends Job {
     public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
 
     /** Default ZooKeeper tick time. */
-    public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 2000;
+    public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
     /** Default ZooKeeper init limit (in ticks). */
     public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
     /** Default ZooKeeper sync limit (in ticks). */
@@ -270,10 +271,10 @@ public class GiraphJob extends Job {
     public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
     /** Default ZooKeeper maximum client connections. */
     public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
-    /** Default ZooKeeper minimum session timeout (in msecs). */
-    public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 10000;
-    /** Default ZooKeeper maximum session timeout (in msecs). */
-    public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 100000;
+    /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
+    public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000;
+    /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
+    public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000;
 
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(GiraphJob.class);
@@ -511,6 +512,10 @@ public class GiraphJob extends Job {
         // Speculative execution doesn't make sense for Giraph
         conf.setBoolean("mapred.map.tasks.speculative.execution", false);
 
+        // Set the ping interval to 5 minutes instead of one minute
+        // (DEFAULT_PING_INTERVAL)
+        Client.setPingInterval(conf, 60000*5);
+
         if (getJar() == null) {
             setJarByClass(GiraphJob.class);
         }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Nov 17 09:50:41 2011
@@ -627,4 +627,25 @@ public class GraphMapper<I extends Writa
                 ZooKeeperManager.State.FINISHED);
         }
     }
+
+    @Override
+    public void run(Context context) throws IOException, InterruptedException {
+        // Notify the master quicker if there is worker failure rather than
+        // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+        try {
+            setup(context);
+            while (context.nextKeyValue()) {
+                map(context.getCurrentKey(),
+                    context.getCurrentValue(),
+                    context);
+            }
+        cleanup(context);
+        } catch (Exception e) {
+            if (mapFunctions == MapFunctions.WORKER_ONLY) {
+                serviceWorker.failureCleanup();
+            }
+            throw new IllegalStateException(
+                "run: Caught an unrecoverable exception " + e.getMessage(), e);
+        }
+    }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Thu Nov 17 09:50:41 2011
@@ -65,7 +65,7 @@ public class TestJsonBase64Format extend
             throws IOException, InterruptedException, ClassNotFoundException {
         GiraphJob job = new GiraphJob(getCallingMethodName());
         setupConfiguration(job);
-        job.setVertexClass(PageRankBenchmark.class);
+        job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
         job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
         job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
         job.getConfiguration().setLong(
@@ -79,7 +79,7 @@ public class TestJsonBase64Format extend
 
         job = new GiraphJob(getCallingMethodName());
         setupConfiguration(job);
-        job.setVertexClass(PageRankBenchmark.class);
+        job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
         job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
         job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
         job.getConfiguration().setInt(PageRankBenchmark.SUPERSTEP_COUNT, 3);
@@ -95,7 +95,7 @@ public class TestJsonBase64Format extend
 
         job = new GiraphJob(getCallingMethodName());
         setupConfiguration(job);
-        job.setVertexClass(PageRankBenchmark.class);
+        job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
         job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
         job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
         job.getConfiguration().setLong(

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java?rev=1203130&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java Thu Nov 17 09:50:41 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.WritableUtils;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests {@link EdgeListVertex}.
+ */
+public class TestEdgeListVertex extends TestCase {
+    /** Instantiated vertex filled in from setup() */
+    private IFDLEdgeListVertex vertex;
+    /** Job filled in by setup() */
+    private GiraphJob job;
+
+    /**
+     * Simple instantiable class that extends {@link EdgeArrayVertex}.
+     */
+    private static class IFDLEdgeListVertex extends
+            EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
+            LongWritable> {
+
+        @Override
+        public void compute(Iterator<LongWritable> msgIterator)
+                throws IOException {
+        }
+    }
+
+    @Override
+    public void setUp() {
+        try {
+            job = new GiraphJob("TestEdgeArrayVertex");
+        } catch (IOException e) {
+            throw new RuntimeException("setUp: Failed", e);
+        }
+        job.setVertexClass(IFDLEdgeListVertex.class);
+        job.getConfiguration().setClass(GiraphJob.VERTEX_INDEX_CLASS,
+            IntWritable.class, WritableComparable.class);
+        job.getConfiguration().setClass(GiraphJob.VERTEX_VALUE_CLASS,
+            FloatWritable.class, Writable.class);
+        job.getConfiguration().setClass(GiraphJob.EDGE_VALUE_CLASS,
+            DoubleWritable.class, Writable.class);
+        job.getConfiguration().setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+            LongWritable.class, Writable.class);
+        vertex = (IFDLEdgeListVertex)
+            BspUtils.<IntWritable, FloatWritable, DoubleWritable, LongWritable>
+            createVertex(job.getConfiguration());
+    }
+
+    public void testInstantiate() throws IOException {
+        assertNotNull(vertex);
+    }
+
+    public void testEdges() {
+        Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+        for (int i = 1000; i > 0; --i) {
+            edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0));
+        }
+        vertex.initialize(null, null, edgeMap, null);
+        assertEquals(vertex.getNumOutEdges(), 1000);
+        int expectedIndex = 1;
+        for (IntWritable index : vertex) {
+            assertEquals(index.get(), expectedIndex);
+            assertEquals(vertex.getEdgeValue(index).get(),
+                         expectedIndex * 2.0d);
+            ++expectedIndex;
+        }
+        assertEquals(vertex.removeEdge(new IntWritable(500)),
+                     new DoubleWritable(1000));
+        assertEquals(vertex.getNumOutEdges(), 999);
+    }
+
+    public void testSerialize() {
+        Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+        for (int i = 1000; i > 0; --i) {
+            edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0));
+        }
+        List<LongWritable> messageList = Lists.newArrayList();
+        messageList.add(new LongWritable(4));
+        messageList.add(new LongWritable(5));
+        vertex.initialize(
+            new IntWritable(2), new FloatWritable(3.0f), edgeMap, messageList);
+        byte[] byteArray = WritableUtils.writeToByteArray(vertex);
+        IFDLEdgeListVertex readVertex = (IFDLEdgeListVertex)
+            BspUtils.<IntWritable, FloatWritable, DoubleWritable, LongWritable>
+            createVertex(job.getConfiguration());
+        WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+        assertEquals(vertex, readVertex);
+    }
+}