You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/17 10:50:42 UTC
svn commit: r1203130 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
src/test...
Author: aching
Date: Thu Nov 17 09:50:41 2011
New Revision: 1203130
URL: http://svn.apache.org/viewvc?rev=1203130&view=rev
Log:
GIRAPH-91: Large-memory improvements (Memory reduced vertex
implementation, fast failure, added settings). (aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/
incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Nov 17 09:50:41 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-91: Large-memory improvements (Memory reduced vertex
+ implementation, fast failure, added settings). (aching)
+
GIRAPH-89: Remove debugging system.out from LongDoubleFloatDoubleVertex.
(shaunak via aching)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Nov 17 09:50:41 2011
@@ -23,6 +23,7 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.conf.Configuration;
@@ -36,33 +37,60 @@ import java.util.Iterator;
/**
* Benchmark based on the basic Pregel PageRank implementation.
*/
-public class PageRankBenchmark extends
- Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable>
- implements Tool {
+public class PageRankBenchmark implements Tool {
/** Configuration from Configurable */
private Configuration conf;
/** How many supersteps to run */
public static String SUPERSTEP_COUNT = "PageRankBenchmark.superstepCount";
- @Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
- if (getSuperstep() >= 1) {
- double sum = 0;
- while (msgIterator.hasNext()) {
- sum += msgIterator.next().get();
+ public static class PageRankVertex extends Vertex<
+ LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ while (msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getNumVertices()) + 0.85f *
+ sum);
+ setVertexValue(vertexValue);
+ }
+
+ if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
+ long edges = getNumOutEdges();
+ sendMsgToAllEdges(
+ new DoubleWritable(getVertexValue().get() / edges));
+ } else {
+ voteToHalt();
}
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
- setVertexValue(vertexValue);
}
+ }
- if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
- long edges = getNumOutEdges();
- sendMsgToAllEdges(
- new DoubleWritable(getVertexValue().get() / edges));
- } else {
- voteToHalt();
+ public static class PageRankEdgeListVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ while (msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getNumVertices()) + 0.85f *
+ sum);
+ setVertexValue(vertexValue);
+ }
+
+ if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
+ long edges = getNumOutEdges();
+ sendMsgToAllEdges(
+ new DoubleWritable(getVertexValue().get() / edges));
+ } else {
+ voteToHalt();
+ }
}
}
@@ -97,6 +125,10 @@ public class PageRankBenchmark extends
"edgesPerVertex",
true,
"Edges per vertex");
+ options.addOption("c",
+ "vertexClass",
+ true,
+ "Vertex class (0 for Vertex, 1 for EdgeListVertex)");
HelpFormatter formatter = new HelpFormatter();
if (args.length == 0) {
formatter.printHelp(getClass().getName(), options, true);
@@ -125,9 +157,19 @@ public class PageRankBenchmark extends
"per vertex (-e)");
return -1;
}
+
int workers = Integer.parseInt(cmd.getOptionValue('w'));
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.setVertexClass(getClass());
+ if (!cmd.hasOption('c') ||
+ (Integer.parseInt(cmd.getOptionValue('c')) == 0)) {
+ System.out.println("Using " +
+ PageRankVertex.class.getName());
+ job.setVertexClass(PageRankVertex.class);
+ } else {
+ System.out.println("Using " +
+ PageRankEdgeListVertex.class.getName());
+ job.setVertexClass(PageRankEdgeListVertex.class);
+ }
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
job.setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Nov 17 09:50:41 2011
@@ -153,4 +153,9 @@ public interface CentralizedServiceWorke
* @return BspMapper
*/
GraphMapper<I, V, E, M> getGraphMapper();
+
+ /**
+ * Operations that will be called if there is a failure by a worker.
+ */
+ void failureCleanup();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Nov 17 09:50:41 2011
@@ -851,6 +851,7 @@ end[HADOOP_FACEBOOK]*/
for (Future<?> future : futures) {
try {
future.get();
+ context.progress();
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Nov 17 09:50:41 2011
@@ -136,8 +136,9 @@ public class BspServiceWorker<
commService = new RPCCommunications<I, V, E, M>(
context, this, graphState);
graphState.setWorkerCommunications(commService);
- this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
- graphMapper.getGraphState());
+ this.workerContext =
+ BspUtils.createWorkerContext(getConfiguration(),
+ graphMapper.getGraphState());
}
public WorkerContext getWorkerContext() {
@@ -710,6 +711,30 @@ public class BspServiceWorker<
}
}
+ /**
+ * Do this to help notify the master quicker that this worker has failed.
+ */
+ private void unregisterHealth() {
+ LOG.error("unregisterHealth: Got failure, unregistering health on " +
+ myHealthZnode + " on superstep " + getSuperstep());
+ try {
+ getZkExt().delete(myHealthZnode, -1);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: InterruptedException - Couldn't delete " +
+ myHealthZnode, e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "unregisterHealth: KeeperException - Couldn't delete " +
+ myHealthZnode, e);
+ }
+ }
+
+ @Override
+ public void failureCleanup() {
+ unregisterHealth();
+ }
+
@Override
public Collection<? extends PartitionOwner> startSuperstep() {
// Algorithm:
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1203130&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Nov 17 09:50:41 2011
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * User applications can subclass {@link EdgeListVertex}, which stores
+ * the outbound edges in an ArrayList (less memory as the cost of expensive
+ * sorting and random-access lookup). Good for static graphs.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class EdgeListVertex<I extends WritableComparable,
+ V extends Writable,
+ E extends Writable, M extends Writable>
+ extends MutableVertex<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** List of the dest edge indices */
+ private List<I> destEdgeIndexList;
+ /** List of the dest edge values */
+ /** Map of destination vertices and their edge values */
+ private List<E> destEdgeValueList;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+
+ @Override
+ public void initialize(I vertexId, V vertexValue,
+ Map<I, E> edges,
+ List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ if (edges != null && !edges.isEmpty()) {
+ destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
+ destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
+ List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
+ Collections.sort(sortedIndexList, new VertexIdComparator());
+ for (I index : sortedIndexList) {
+ destEdgeIndexList.add(index);
+ destEdgeValueList.add(edges.get(index));
+ }
+ sortedIndexList.clear();
+ } else {
+ destEdgeIndexList = Lists.newArrayList();
+ destEdgeValueList = Lists.newArrayList();
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof EdgeListVertex) {
+ @SuppressWarnings("unchecked")
+ EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
+ if (!getVertexId().equals(otherVertex.getVertexId())) {
+ return false;
+ }
+ if (!getVertexValue().equals(otherVertex.getVertexValue())) {
+ return false;
+ }
+ if (!getMsgList().equals(otherVertex.getMsgList())) {
+ return false;
+ }
+ Iterator<I> iterator = iterator();
+ Iterator<I> otherIterator = otherVertex.iterator();
+ while (iterator.hasNext() && otherIterator.hasNext()) {
+ I index = iterator.next();
+ I otherIndex = otherIterator.next();
+ if (!(index == null ? otherIndex == null :
+ index.equals(otherIndex))) {
+ return false;
+ }
+ }
+ return !(iterator.hasNext() || otherIterator.hasNext());
+ }
+ return false;
+ }
+
+ /**
+ * Comparator for the vertex id
+ */
+ private class VertexIdComparator implements Comparator<I> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(I index1, I index2) {
+ return index1.compareTo(index2);
+ }
+ }
+
+ @Override
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos == destEdgeIndexList.size() ||
+ !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+ destEdgeIndexList.add(pos, targetVertexId);
+ destEdgeValueList.add(pos, edgeValue);
+ return true;
+ } else {
+ LOG.warn("addEdge: Vertex=" + vertexId +
+ ": already added an edge value for dest vertex id " +
+ targetVertexId);
+ return false;
+ }
+ }
+
+ @Override
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ @Override
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ @Override
+ public final I getVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public final V getVertexValue() {
+ return vertexValue;
+ }
+
+ @Override
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ }
+
+ @Override
+ public E getEdgeValue(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos == destEdgeIndexList.size() ||
+ !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+ return null;
+ } else {
+ return destEdgeValueList.get(pos);
+ }
+ }
+
+ @Override
+ public boolean hasEdge(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos == destEdgeIndexList.size() ||
+ !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ /**
+ * Get an iterator to the edges on this vertex.
+ *
+ * @return A <em>sorted</em> iterator, as defined by the sort-order
+ * of the vertex ids
+ */
+ @Override
+ public Iterator<I> iterator() {
+ return destEdgeIndexList.iterator();
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return destEdgeIndexList.size();
+ }
+
+ @Override
+ public E removeEdge(I targetVertexId) {
+ int pos = Collections.binarySearch(destEdgeIndexList,
+ targetVertexId,
+ new VertexIdComparator());
+ if (pos == destEdgeIndexList.size() ||
+ !destEdgeIndexList.get(pos).equals(targetVertexId)) {
+ return null;
+ } else {
+ destEdgeIndexList.remove(pos);
+ return destEdgeValueList.remove(pos);
+ }
+ }
+
+ @Override
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (I index : destEdgeIndexList) {
+ sendMsg(index, msg);
+ }
+ }
+
+ @Override
+ public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
+ throws IOException {
+ getGraphState().getWorkerCommunications().
+ addVertexReq(vertex);
+ }
+
+ @Override
+ public void removeVertexRequest(I vertexId) throws IOException {
+ getGraphState().getWorkerCommunications().
+ removeVertexReq(vertexId);
+ }
+
+ @Override
+ public void addEdgeRequest(I vertexIndex,
+ Edge<I, E> edge) throws IOException {
+ getGraphState().getWorkerCommunications().
+ addEdgeReq(vertexIndex, edge);
+ }
+
+ @Override
+ public void removeEdgeRequest(I sourceVertexId,
+ I destVertexId) throws IOException {
+ getGraphState().getWorkerCommunications().
+ removeEdgeReq(sourceVertexId, destVertexId);
+ }
+
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ vertexId = BspUtils.<I>createVertexIndex(getConf());
+ vertexId.readFields(in);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = BspUtils.<V>createVertexValue(getConf());
+ vertexValue.readFields(in);
+ }
+ int edgeListCount = in.readInt();
+ destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
+ destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
+ for (int i = 0; i < edgeListCount; ++i) {
+ I vertexId = BspUtils.<I>createVertexIndex(getConf());
+ E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+ vertexId.readFields(in);
+ edgeValue.readFields(in);
+ destEdgeIndexList.add(vertexId);
+ destEdgeValueList.add(edgeValue);
+ }
+ int msgListSize = in.readInt();
+ for (int i = 0; i < msgListSize; ++i) {
+ M msg = BspUtils.<M>createMessageValue(getConf());
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ }
+
+ @Override
+ final public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ out.writeInt(destEdgeIndexList.size());
+ for (int i = 0 ; i < destEdgeIndexList.size(); ++i) {
+ destEdgeIndexList.get(i).write(out);
+ destEdgeValueList.get(i).write(out);
+ }
+ out.writeInt(msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
+
+ @Override
+ public List<M> getMsgList() {
+ return msgList;
+ }
+
+ @Override
+ public String toString() {
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+ ",#edges=" + getNumOutEdges() + ")";
+ }
+}
+
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Nov 17 09:50:41 2011
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.BspInputFor
import org.apache.giraph.bsp.BspOutputFormat;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
@@ -261,7 +262,7 @@ public class GiraphJob extends Job {
public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
/** Default ZooKeeper tick time. */
- public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 2000;
+ public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
/** Default ZooKeeper init limit (in ticks). */
public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
/** Default ZooKeeper sync limit (in ticks). */
@@ -270,10 +271,10 @@ public class GiraphJob extends Job {
public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
/** Default ZooKeeper maximum client connections. */
public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
- /** Default ZooKeeper minimum session timeout (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 10000;
- /** Default ZooKeeper maximum session timeout (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 100000;
+ /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
+ public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000;
+ /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
+ public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000;
/** Class logger */
private static final Logger LOG = Logger.getLogger(GiraphJob.class);
@@ -511,6 +512,10 @@ public class GiraphJob extends Job {
// Speculative execution doesn't make sense for Giraph
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+ // Set the ping interval to 5 minutes instead of one minute
+ // (DEFAULT_PING_INTERVAL)
+ Client.setPingInterval(conf, 60000*5);
+
if (getJar() == null) {
setJarByClass(GiraphJob.class);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Nov 17 09:50:41 2011
@@ -627,4 +627,25 @@ public class GraphMapper<I extends Writa
ZooKeeperManager.State.FINISHED);
}
}
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ // Notify the master quicker if there is worker failure rather than
+ // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+ try {
+ setup(context);
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(),
+ context.getCurrentValue(),
+ context);
+ }
+ cleanup(context);
+ } catch (Exception e) {
+ if (mapFunctions == MapFunctions.WORKER_ONLY) {
+ serviceWorker.failureCleanup();
+ }
+ throw new IllegalStateException(
+ "run: Caught an unrecoverable exception " + e.getMessage(), e);
+ }
+ }
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1203130&r1=1203129&r2=1203130&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Thu Nov 17 09:50:41 2011
@@ -65,7 +65,7 @@ public class TestJsonBase64Format extend
throws IOException, InterruptedException, ClassNotFoundException {
GiraphJob job = new GiraphJob(getCallingMethodName());
setupConfiguration(job);
- job.setVertexClass(PageRankBenchmark.class);
+ job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
job.getConfiguration().setLong(
@@ -79,7 +79,7 @@ public class TestJsonBase64Format extend
job = new GiraphJob(getCallingMethodName());
setupConfiguration(job);
- job.setVertexClass(PageRankBenchmark.class);
+ job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
job.setVertexInputFormatClass(JsonBase64VertexInputFormat.class);
job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
job.getConfiguration().setInt(PageRankBenchmark.SUPERSTEP_COUNT, 3);
@@ -95,7 +95,7 @@ public class TestJsonBase64Format extend
job = new GiraphJob(getCallingMethodName());
setupConfiguration(job);
- job.setVertexClass(PageRankBenchmark.class);
+ job.setVertexClass(PageRankBenchmark.PageRankVertex.class);
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
job.setVertexOutputFormatClass(JsonBase64VertexOutputFormat.class);
job.getConfiguration().setLong(
Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java?rev=1203130&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java Thu Nov 17 09:50:41 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.WritableUtils;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests {@link EdgeListVertex}.
+ */
+public class TestEdgeListVertex extends TestCase {
+ /** Instantiated vertex filled in from setup() */
+ private IFDLEdgeListVertex vertex;
+ /** Job filled in by setup() */
+ private GiraphJob job;
+
+ /**
+ * Simple instantiable class that extends {@link EdgeArrayVertex}.
+ */
+ private static class IFDLEdgeListVertex extends
+ EdgeListVertex<IntWritable, FloatWritable, DoubleWritable,
+ LongWritable> {
+
+ @Override
+ public void compute(Iterator<LongWritable> msgIterator)
+ throws IOException {
+ }
+ }
+
+ @Override
+ public void setUp() {
+ try {
+ job = new GiraphJob("TestEdgeArrayVertex");
+ } catch (IOException e) {
+ throw new RuntimeException("setUp: Failed", e);
+ }
+ job.setVertexClass(IFDLEdgeListVertex.class);
+ job.getConfiguration().setClass(GiraphJob.VERTEX_INDEX_CLASS,
+ IntWritable.class, WritableComparable.class);
+ job.getConfiguration().setClass(GiraphJob.VERTEX_VALUE_CLASS,
+ FloatWritable.class, Writable.class);
+ job.getConfiguration().setClass(GiraphJob.EDGE_VALUE_CLASS,
+ DoubleWritable.class, Writable.class);
+ job.getConfiguration().setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+ LongWritable.class, Writable.class);
+ vertex = (IFDLEdgeListVertex)
+ BspUtils.<IntWritable, FloatWritable, DoubleWritable, LongWritable>
+ createVertex(job.getConfiguration());
+ }
+
+ public void testInstantiate() throws IOException {
+ assertNotNull(vertex);
+ }
+
+ public void testEdges() {
+ Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+ for (int i = 1000; i > 0; --i) {
+ edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0));
+ }
+ vertex.initialize(null, null, edgeMap, null);
+ assertEquals(vertex.getNumOutEdges(), 1000);
+ int expectedIndex = 1;
+ for (IntWritable index : vertex) {
+ assertEquals(index.get(), expectedIndex);
+ assertEquals(vertex.getEdgeValue(index).get(),
+ expectedIndex * 2.0d);
+ ++expectedIndex;
+ }
+ assertEquals(vertex.removeEdge(new IntWritable(500)),
+ new DoubleWritable(1000));
+ assertEquals(vertex.getNumOutEdges(), 999);
+ }
+
+ public void testSerialize() {
+ Map<IntWritable, DoubleWritable> edgeMap = Maps.newHashMap();
+ for (int i = 1000; i > 0; --i) {
+ edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0));
+ }
+ List<LongWritable> messageList = Lists.newArrayList();
+ messageList.add(new LongWritable(4));
+ messageList.add(new LongWritable(5));
+ vertex.initialize(
+ new IntWritable(2), new FloatWritable(3.0f), edgeMap, messageList);
+ byte[] byteArray = WritableUtils.writeToByteArray(vertex);
+ IFDLEdgeListVertex readVertex = (IFDLEdgeListVertex)
+ BspUtils.<IntWritable, FloatWritable, DoubleWritable, LongWritable>
+ createVertex(job.getConfiguration());
+ WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+ assertEquals(vertex, readVertex);
+ }
+}