You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/02 22:37:31 UTC
svn commit: r1405175 [1/3] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/o...
Author: apresta
Date: Fri Nov 2 21:37:30 2012
New Revision: 1405175
URL: http://svn.apache.org/viewvc?rev=1405175&view=rev
Log:
GIRAPH-155: Allow creation of graph by adding edges that span multiple workers (apresta)
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextEdgeInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexValueInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/IntPair.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestEdgeInput.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/FileUtils.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestJsonBase64Format.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Nov 2 21:37:30 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-155: Allow creation of graph by adding edges that span
+ multiple workers (apresta)
+
GIRAPH-398: Missing a dependency (nitay via majakabiljo)
GIRAPH-394: mapreduce.job.user.classpath.first hadoop option typo
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Fri Nov 2 21:37:30 2012
@@ -19,6 +19,7 @@
package org.apache.giraph;
import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.EdgeInputFormat;
import org.apache.giraph.graph.MasterCompute;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexCombiner;
@@ -38,16 +39,9 @@ import org.apache.hadoop.conf.Configurat
public class GiraphConfiguration extends Configuration {
/** Vertex class - required */
public static final String VERTEX_CLASS = "giraph.vertexClass";
- /** VertexInputFormat class - required */
- public static final String VERTEX_INPUT_FORMAT_CLASS =
- "giraph.vertexInputFormatClass";
/** Class for Master - optional */
public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
-
- /** VertexOutputFormat class - optional */
- public static final String VERTEX_OUTPUT_FORMAT_CLASS =
- "giraph.vertexOutputFormatClass";
/** Vertex combiner class - optional */
public static final String VERTEX_COMBINER_CLASS =
"giraph.combinerClass";
@@ -58,6 +52,18 @@ public class GiraphConfiguration extends
public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
"giraph.graphPartitionerFactoryClass";
+ // At least one of the input format classes is required.
+ /** VertexInputFormat class */
+ public static final String VERTEX_INPUT_FORMAT_CLASS =
+ "giraph.vertexInputFormatClass";
+ /** EdgeInputFormat class */
+ public static final String EDGE_INPUT_FORMAT_CLASS =
+ "giraph.edgeInputFormatClass";
+
+ /** VertexOutputFormat class */
+ public static final String VERTEX_OUTPUT_FORMAT_CLASS =
+ "giraph.vertexOutputFormatClass";
+
/** Vertex index class */
public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
/** Vertex value class */
@@ -425,18 +431,29 @@ public class GiraphConfiguration extends
public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
/**
- * To limit outlier input splits from producing too many vertices or to
- * help with testing, the number of vertices loaded from an input split can
- * be limited. By default, everything is loaded.
+ * To limit outlier vertex input splits from producing too many vertices or
+ * to help with testing, the number of vertices loaded from an input split
+ * can be limited. By default, everything is loaded.
*/
public static final String INPUT_SPLIT_MAX_VERTICES =
"giraph.InputSplitMaxVertices";
/**
- * Default is that all the vertices are to be loaded from the input
- * split
+ * Default is that all the vertices are to be loaded from the input split
*/
public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+ /**
+ * To limit outlier vertex input splits from producing too many vertices or
+ * to help with testing, the number of edges loaded from an input split
+ * can be limited. By default, everything is loaded.
+ */
+ public static final String INPUT_SPLIT_MAX_EDGES =
+ "giraph.InputSplitMaxEdges";
+ /**
+ * Default is that all the edges are to be loaded from the input split
+ */
+ public static final long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+
/** Java opts passed to ZooKeeper startup */
public static final String ZOOKEEPER_JAVA_OPTS =
"giraph.zkJavaOpts";
@@ -632,6 +649,18 @@ public class GiraphConfiguration extends
}
/**
+ * Set the edge input format class (required)
+ *
+ * @param edgeInputFormatClass Determines how graph is input
+ */
+ public final void setEdgeInputFormatClass(
+ Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+ setClass(EDGE_INPUT_FORMAT_CLASS,
+ edgeInputFormatClass,
+ EdgeInputFormat.class);
+ }
+
+ /**
* Set the master class (optional)
*
* @param masterComputeClass Runs master computation
@@ -937,4 +966,8 @@ public class GiraphConfiguration extends
public long getInputSplitMaxVertices() {
return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
}
+
+ public long getInputSplitMaxEdges() {
+ return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Fri Nov 2 21:37:30 2012
@@ -18,10 +18,10 @@
package org.apache.giraph;
-import java.util.List;
import org.apache.giraph.graph.AggregatorWriter;
import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.DefaultWorkerContext;
+import org.apache.giraph.graph.EdgeInputFormat;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.MasterCompute;
import org.apache.giraph.graph.TextAggregatorWriter;
@@ -41,6 +41,8 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import java.util.List;
+
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
@@ -77,6 +79,10 @@ public class ImmutableClassesGiraphConfi
/** Vertex output format class - cached for fast access */
private final Class<? extends VertexOutputFormat<I, V, E>>
vertexOutputFormatClass;
+ /** Edge input format class - cached for fast access */
+ private final Class<? extends EdgeInputFormat<I, E>>
+ edgeInputFormatClass;
+
/** Aggregator writer class - cached for fast access */
private final Class<? extends AggregatorWriter> aggregatorWriterClass;
@@ -124,6 +130,9 @@ public class ImmutableClassesGiraphConfi
vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
null, VertexOutputFormat.class);
+ edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
+ conf.getClass(EDGE_INPUT_FORMAT_CLASS,
+ null, EdgeInputFormat.class);
aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
TextAggregatorWriter.class, AggregatorWriter.class);
@@ -169,6 +178,15 @@ public class ImmutableClassesGiraphConfi
}
/**
+ * Does the job have a {@link VertexInputFormat}?
+ *
+ * @return True iff a {@link VertexInputFormat} has been specified.
+ */
+ public boolean hasVertexInputFormat() {
+ return vertexInputFormatClass != null;
+ }
+
+ /**
* Get the user's subclassed
* {@link org.apache.giraph.graph.VertexInputFormat}.
*
@@ -211,6 +229,34 @@ public class ImmutableClassesGiraphConfi
}
/**
+ * Does the job have an {@link EdgeInputFormat}?
+ *
+ * @return True iff an {@link EdgeInputFormat} has been specified.
+ */
+ public boolean hasEdgeInputFormat() {
+ return edgeInputFormatClass != null;
+ }
+
+ /**
+ * Get the user's subclassed
+ * {@link org.apache.giraph.graph.EdgeInputFormat}.
+ *
+ * @return User's edge input format class
+ */
+ public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
+ return edgeInputFormatClass;
+ }
+
+ /**
+ * Create a user edge input format class
+ *
+ * @return Instantiated user edge input format class
+ */
+ public EdgeInputFormat<I, E> createEdgeInputFormat() {
+ return ReflectionUtils.newInstance(edgeInputFormatClass, this);
+ }
+
+ /**
* Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}.
*
* @return User's aggregator writer class
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Fri Nov 2 21:37:30 2012
@@ -54,10 +54,20 @@ public interface CentralizedServiceMaste
* user-defined VertexInputFormat. The {@link InputSplit} objects will
* processed by the workers later on during the INPUT_SUPERSTEP.
*
- * @return Number of partitions. Returns -1 on failure to create
+ * @return Number of splits. Returns -1 on failure to create
* valid input splits.
*/
- int createInputSplits();
+ int createVertexInputSplits();
+
+ /**
+ * Create the {@link InputSplit} objects from the index range based on the
+ * user-defined EdgeInputFormat. The {@link InputSplit} objects will
+ * processed by the workers later on during the INPUT_SUPERSTEP.
+ *
+ * @return Number of splits. Returns -1 on failure to create
+ * valid input splits.
+ */
+ int createEdgeInputSplits();
/**
* Master coordinates the superstep
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Fri Nov 2 21:37:30 2012
@@ -44,14 +44,20 @@ public interface WorkerServer<I extends
int getPort();
/**
- * Move the in transition messages to the in messages for every vertex and
- * add new connections to any newly appearing IPC proxies.
+ * Prepare incoming messages for computation, and resolve mutation requests.
*
* @param graphState Current graph state
*/
void prepareSuperstep(GraphState<I, V, E, M> graphState);
/**
+ * Only resolve mutations requests (used for edge-oriented input).
+ *
+ * @param graphState Current graph state
+ */
+ void resolveMutations(GraphState<I, V, E, M> graphState);
+
+ /**
* Get server data
*
* @return Server data
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Fri Nov 2 21:37:30 2012
@@ -17,8 +17,6 @@
*/
package org.apache.giraph.comm.netty;
-import com.google.common.collect.Maps;
-import com.yammer.metrics.core.Histogram;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -49,6 +47,9 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
+import com.yammer.metrics.core.Histogram;
+
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Fri Nov 2 21:37:30 2012
@@ -118,7 +118,11 @@ public class NettyWorkerServer<I extends
@Override
public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
serverData.prepareSuperstep();
+ resolveMutations(graphState);
+ }
+ @Override
+ public void resolveMutations(GraphState<I, V, E, M> graphState) {
Set<I> resolveVertexIndexSet = Sets.newHashSet();
// Keep track of the vertices which are not here but have received messages
for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
@@ -180,8 +184,8 @@ public class NettyWorkerServer<I extends
if (partition == null) {
throw new IllegalStateException(
"prepareSuperstep: No partition for index " + vertexIndex +
- " in " + service.getPartitionStore() + " should have been " +
- service.getVertexPartitionOwner(vertexIndex));
+ " in " + service.getPartitionStore() + " should have been " +
+ service.getVertexPartitionOwner(vertexIndex));
}
if (vertex != null) {
partition.putVertex(vertex);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java Fri Nov 2 21:37:30 2012
@@ -18,13 +18,6 @@
package org.apache.giraph.comm.requests;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.graph.VertexMutations;
import org.apache.hadoop.io.Writable;
@@ -33,6 +26,13 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Send a collection of vertex mutations for a partition.
*
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BasicVertexValueReader.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for VertexValueReader.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class BasicVertexValueReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements VertexReader<I, V, E, M> {
+ /**
+ * User-defined method to extract the vertex id.
+ *
+ * @return The vertex id
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ public abstract I getCurrentVertexId() throws IOException,
+ InterruptedException;
+
+ /**
+ * User-defined method to extract the vertex value.
+ *
+ * @return The vertex value
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract V getCurrentVertexValue() throws IOException,
+ InterruptedException;
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Fri Nov 2 21:37:30 2012
@@ -72,22 +72,40 @@ public abstract class BspService<I exten
public static final String BASE_DIR = "/_hadoopBsp";
/** Master job state znode above base dir */
public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
- /** Input split directory about base dir */
- public static final String INPUT_SPLIT_DIR = "/_inputSplitDir";
- /** Input split done directory about base dir */
- public static final String INPUT_SPLIT_DONE_DIR = "/_inputSplitDoneDir";
- /** Denotes a reserved input split */
- public static final String INPUT_SPLIT_RESERVED_NODE =
- "/_inputSplitReserved";
- /** Denotes a finished input split */
- public static final String INPUT_SPLIT_FINISHED_NODE =
- "/_inputSplitFinished";
- /** Denotes that all the input splits are are ready for consumption */
- public static final String INPUT_SPLITS_ALL_READY_NODE =
- "/_inputSplitsAllReady";
- /** Denotes that all the input splits are done. */
- public static final String INPUT_SPLITS_ALL_DONE_NODE =
- "/_inputSplitsAllDone";
+ /** Vertex input split directory about base dir */
+ public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
+ /** Vertex input split done directory about base dir */
+ public static final String VERTEX_INPUT_SPLIT_DONE_DIR =
+ "/_vertexInputSplitDoneDir";
+ /** Denotes a reserved vertex input split */
+ public static final String VERTEX_INPUT_SPLIT_RESERVED_NODE =
+ "/_vertexInputSplitReserved";
+ /** Denotes a finished vertex input split */
+ public static final String VERTEX_INPUT_SPLIT_FINISHED_NODE =
+ "/_vertexInputSplitFinished";
+ /** Denotes that all the vertex input splits are are ready for consumption */
+ public static final String VERTEX_INPUT_SPLITS_ALL_READY_NODE =
+ "/_vertexInputSplitsAllReady";
+ /** Denotes that all the vertex input splits are done. */
+ public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
+ "/_vertexInputSplitsAllDone";
+ /** Edge input split directory about base dir */
+ public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
+ /** Edge input split done directory about base dir */
+ public static final String EDGE_INPUT_SPLIT_DONE_DIR =
+ "/_edgeInputSplitDoneDir";
+ /** Denotes a reserved edge input split */
+ public static final String EDGE_INPUT_SPLIT_RESERVED_NODE =
+ "/_edgeInputSplitReserved";
+ /** Denotes a finished edge input split */
+ public static final String EDGE_INPUT_SPLIT_FINISHED_NODE =
+ "/_edgeInputSplitFinished";
+ /** Denotes that all the edge input splits are are ready for consumption */
+ public static final String EDGE_INPUT_SPLITS_ALL_READY_NODE =
+ "/_edgeInputSplitsAllReady";
+ /** Denotes that all the edge input splits are done. */
+ public static final String EDGE_INPUT_SPLITS_ALL_DONE_NODE =
+ "/_edgeInputSplitsAllDone";
/** Directory of attempts of this application */
public static final String APPLICATION_ATTEMPTS_DIR =
"/_applicationAttemptsDir";
@@ -120,32 +138,8 @@ public abstract class BspService<I exten
/** JSON partition stats key */
public static final String JSONOBJ_PARTITION_STATS_KEY =
"_partitionStatsKey";
- /** JSON finished vertices key */
- public static final String JSONOBJ_FINISHED_VERTICES_KEY =
- "_verticesFinishedKey";
- /** JSON vertex count key */
- public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
- /** JSON edge count key */
- public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey";
/** JSON message count key */
public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
- /** JSON hostname id key */
- public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey";
- /** JSON max vertex index key */
- public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY =
- "_maxVertexIndexKey";
- /** JSON hostname key */
- public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey";
- /** JSON port key */
- public static final String JSONOBJ_PORT_KEY = "_portKey";
- /** JSON checkpoint file prefix key */
- public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY =
- "_checkpointFilePrefixKey";
- /** JSON previous hostname key */
- public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY =
- "_previousHostnameKey";
- /** JSON previous port key */
- public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey";
/** JSON state key */
public static final String JSONOBJ_STATE_KEY = "_stateKey";
/** JSON application attempt key */
@@ -181,14 +175,14 @@ public abstract class BspService<I exten
protected final String basePath;
/** Path to the job state determined by the master (informative only) */
protected final String masterJobStatePath;
- /** Path to the input splits written by the master */
- protected final String inputSplitsPath;
- /** Path to the input splits all ready to be processed by workers */
- protected final String inputSplitsAllReadyPath;
- /** Path to the input splits done */
- protected final String inputSplitsDonePath;
- /** Path to the input splits all done to notify the workers to proceed */
- protected final String inputSplitsAllDonePath;
+ /** ZooKeeper paths for vertex input splits. */
+ protected final InputSplitPaths vertexInputSplitsPaths;
+ /** ZooKeeper paths for edge input splits. */
+ protected final InputSplitPaths edgeInputSplitsPaths;
+ /** Vertex input split events. */
+ protected final InputSplitEvents vertexInputSplitsEvents;
+ /** Edge input split events. */
+ protected final InputSplitEvents edgeInputSplitsEvents;
/** Path to the application attempts) */
protected final String applicationAttemptsPath;
/** Path to the cleaned up notifications */
@@ -203,14 +197,6 @@ public abstract class BspService<I exten
private final BspEvent connectedEvent;
/** Has worker registration changed (either healthy or unhealthy) */
private final BspEvent workerHealthRegistrationChanged;
- /** InputSplits are ready for consumption by workers */
- private final BspEvent inputSplitsAllReadyChanged;
- /** InputSplit reservation or finished notification and synchronization */
- private final BspEvent inputSplitsStateChanged;
- /** InputSplits are done being processed by workers */
- private final BspEvent inputSplitsAllDoneChanged;
- /** InputSplit done by a worker finished notification and synchronization */
- private final BspEvent inputSplitsDoneStateChanged;
/** Are the addresses and partition assignments to workers ready? */
private final BspEvent addressesAndPartitionsReadyChanged;
/** Application attempt changed */
@@ -263,12 +249,10 @@ public abstract class BspService<I exten
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphMapper<I, V, E, M> graphMapper) {
+ this.vertexInputSplitsEvents = new InputSplitEvents(context);
+ this.edgeInputSplitsEvents = new InputSplitEvents(context);
this.connectedEvent = new PredicateLock(context);
this.workerHealthRegistrationChanged = new PredicateLock(context);
- this.inputSplitsAllReadyChanged = new PredicateLock(context);
- this.inputSplitsStateChanged = new PredicateLock(context);
- this.inputSplitsAllDoneChanged = new PredicateLock(context);
- this.inputSplitsDoneStateChanged = new PredicateLock(context);
this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
this.applicationAttemptChanged = new PredicateLock(context);
this.superstepFinished = new PredicateLock(context);
@@ -277,8 +261,10 @@ public abstract class BspService<I exten
registerBspEvent(connectedEvent);
registerBspEvent(workerHealthRegistrationChanged);
- registerBspEvent(inputSplitsAllReadyChanged);
- registerBspEvent(inputSplitsStateChanged);
+ registerBspEvent(vertexInputSplitsEvents.getAllReadyChanged());
+ registerBspEvent(vertexInputSplitsEvents.getStateChanged());
+ registerBspEvent(edgeInputSplitsEvents.getAllReadyChanged());
+ registerBspEvent(edgeInputSplitsEvents.getStateChanged());
registerBspEvent(addressesAndPartitionsReadyChanged);
registerBspEvent(applicationAttemptChanged);
registerBspEvent(superstepFinished);
@@ -315,10 +301,12 @@ public abstract class BspService<I exten
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
- inputSplitsPath = basePath + INPUT_SPLIT_DIR;
- inputSplitsAllReadyPath = basePath + INPUT_SPLITS_ALL_READY_NODE;
- inputSplitsDonePath = basePath + INPUT_SPLIT_DONE_DIR;
- inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
+ vertexInputSplitsPaths = new InputSplitPaths(basePath,
+ VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
+ VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
+ edgeInputSplitsPaths = new InputSplitPaths(basePath,
+ EDGE_INPUT_SPLIT_DIR, EDGE_INPUT_SPLIT_DONE_DIR,
+ EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE);
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
checkpointBasePath = getConfiguration().get(
@@ -498,18 +486,6 @@ public abstract class BspService<I exten
}
/**
- * Generate the merged aggregator directory path for a superstep
- *
- * @param attempt application attempt number
- * @param superstep superstep to use
- * @return directory path based on the a superstep
- */
- public final String getMergedAggregatorPath(long attempt, long superstep) {
- return applicationAttemptsPath + "/" + attempt +
- SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR;
- }
-
- /**
* Generate the "superstep finished" directory path for a superstep
*
* @param attempt application attempt number
@@ -636,22 +612,6 @@ public abstract class BspService<I exten
return workerHealthRegistrationChanged;
}
- public final BspEvent getInputSplitsAllReadyEvent() {
- return inputSplitsAllReadyChanged;
- }
-
- public final BspEvent getInputSplitsStateChangedEvent() {
- return inputSplitsStateChanged;
- }
-
- public final BspEvent getInputSplitsAllDoneEvent() {
- return inputSplitsAllDoneChanged;
- }
-
- public final BspEvent getInputSplitsDoneStateChangedEvent() {
- return inputSplitsDoneStateChanged;
- }
-
public final BspEvent getAddressesAndPartitionsReadyChangedEvent() {
return addressesAndPartitionsReadyChanged;
}
@@ -962,53 +922,105 @@ public abstract class BspService<I exten
}
workerHealthRegistrationChanged.signal();
eventProcessed = true;
- } else if (event.getPath().equals(inputSplitsAllReadyPath) &&
+ } else if (event.getPath().equals(
+ vertexInputSplitsPaths.getAllReadyPath()) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isInfoEnabled()) {
LOG.info("process: inputSplitsReadyChanged " +
"(input splits ready)");
}
- inputSplitsAllReadyChanged.signal();
+ vertexInputSplitsEvents.getAllReadyChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: vertexInputSplitsStateChanged " +
+ "(made a reservation)");
+ }
+ vertexInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_RESERVED_NODE) &&
+ (event.getType() == EventType.NodeDeleted)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: vertexInputSplitsStateChanged " +
+ "(lost a reservation)");
+ }
+ vertexInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_FINISHED_NODE) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: vertexInputSplitsStateChanged " +
+ "(finished inputsplit)");
+ }
+ vertexInputSplitsEvents.getStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().endsWith(VERTEX_INPUT_SPLIT_DONE_DIR) &&
+ (event.getType() == EventType.NodeChildrenChanged)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("process: vertexInputSplitsDoneStateChanged " +
+ "(worker finished sending)");
+ }
+ vertexInputSplitsEvents.getDoneStateChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().equals(
+ vertexInputSplitsPaths.getAllDonePath()) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: vertexInputSplitsAllDoneChanged " +
+ "(all vertices sent from input splits)");
+ }
+ vertexInputSplitsEvents.getAllDoneChanged().signal();
+ eventProcessed = true;
+ } else if (event.getPath().equals(
+ edgeInputSplitsPaths.getAllReadyPath()) &&
+ (event.getType() == EventType.NodeCreated)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("process: edgeInputSplitsReadyChanged " +
+ "(input splits ready)");
+ }
+ edgeInputSplitsEvents.getAllReadyChanged().signal();
eventProcessed = true;
- } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+ } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("process: inputSplitsStateChanged " +
+ LOG.debug("process: edgeInputSplitsStateChanged " +
"(made a reservation)");
}
- inputSplitsStateChanged.signal();
+ edgeInputSplitsEvents.getStateChanged().signal();
eventProcessed = true;
- } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) &&
+ } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_RESERVED_NODE) &&
(event.getType() == EventType.NodeDeleted)) {
if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsStateChanged " +
+ LOG.info("process: edgeInputSplitsStateChanged " +
"(lost a reservation)");
}
- inputSplitsStateChanged.signal();
+ edgeInputSplitsEvents.getStateChanged().signal();
eventProcessed = true;
- } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) &&
+ } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_FINISHED_NODE) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("process: inputSplitsStateChanged " +
+ LOG.debug("process: edgeInputSplitsStateChanged " +
"(finished inputsplit)");
}
- inputSplitsStateChanged.signal();
+ edgeInputSplitsEvents.getStateChanged().signal();
eventProcessed = true;
- } else if (event.getPath().endsWith(INPUT_SPLIT_DONE_DIR) &&
+ } else if (event.getPath().endsWith(EDGE_INPUT_SPLIT_DONE_DIR) &&
(event.getType() == EventType.NodeChildrenChanged)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("process: inputSplitsDoneStateChanged " +
+ LOG.debug("process: edgeInputSplitsDoneStateChanged " +
"(worker finished sending)");
}
- inputSplitsDoneStateChanged.signal();
+ edgeInputSplitsEvents.getDoneStateChanged().signal();
eventProcessed = true;
- } else if (event.getPath().equals(inputSplitsAllDonePath) &&
+ } else if (event.getPath().equals(
+ edgeInputSplitsPaths.getAllDonePath()) &&
(event.getType() == EventType.NodeCreated)) {
if (LOG.isInfoEnabled()) {
- LOG.info("process: inputSplitsAllDoneChanged " +
+ LOG.info("process: edgeInputSplitsAllDoneChanged " +
"(all vertices sent from input splits)");
}
- inputSplitsAllDoneChanged.signal();
+ edgeInputSplitsEvents.getAllDoneChanged().signal();
eventProcessed = true;
} else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
event.getType() == EventType.NodeCreated) {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Nov 2 21:37:30 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Sets;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
@@ -57,6 +56,7 @@ import org.json.JSONException;
import org.json.JSONObject;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import net.iharder.Base64;
import java.io.ByteArrayOutputStream;
@@ -237,47 +237,44 @@ public class BspServiceMaster<I extends
}
/**
- * Master uses this to calculate the {@link VertexInputFormat}
- * input splits and write it to ZooKeeper.
+ * Common method for generating vertex/edge input splits.
*
+ * @param inputFormat The vertex/edge input format
* @param numWorkers Number of available workers
- * @return List of input splits
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws IOException
- * @throws InterruptedException
+ * @param inputSplitType Type of input splits (for logging purposes)
+ * @return List of input splits for the given format
*/
- private List<InputSplit> generateInputSplits(int numWorkers) {
- VertexInputFormat<I, V, E, M> vertexInputFormat =
- getConfiguration().createVertexInputFormat();
+ private List<InputSplit> generateInputSplits(GiraphInputFormat inputFormat,
+ int numWorkers,
+ String inputSplitType) {
+ String logPrefix = "generate" + inputSplitType + "InputSplits";
List<InputSplit> splits;
try {
- splits = vertexInputFormat.getSplits(getContext(), numWorkers);
- float samplePercent =
- getConfiguration().getFloat(
- GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent !=
- GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
- int lastIndex = (int) (samplePercent * splits.size() / 100f);
- List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
- LOG.warn("generateInputSplits: Using sampling - Processing " +
- "only " + sampleSplits.size() + " instead of " +
- splits.size() + " expected splits.");
- return sampleSplits;
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("generateInputSplits: Got " + splits.size() +
- " input splits for " + numWorkers + " workers");
- }
- return splits;
- }
+ splits = inputFormat.getSplits(getContext(), numWorkers);
} catch (IOException e) {
- throw new IllegalStateException(
- "generateInputSplits: Got IOException", e);
+ throw new IllegalStateException(logPrefix + ": Got IOException", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
- "generateInputSplits: Got InterruptedException", e);
+ logPrefix + ": Got InterruptedException", e);
+ }
+ float samplePercent =
+ getConfiguration().getFloat(
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+ if (samplePercent !=
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+ int lastIndex = (int) (samplePercent * splits.size() / 100f);
+ List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
+ LOG.warn(logPrefix + ": Using sampling - Processing only " +
+ sampleSplits.size() + " instead of " + splits.size() +
+ " expected splits.");
+ return sampleSplits;
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(logPrefix + ": Got " + splits.size() +
+ " input splits for " + numWorkers + " workers");
+ }
+ return splits;
}
}
@@ -508,31 +505,39 @@ public class BspServiceMaster<I extends
}
}
- @Override
- public int createInputSplits() {
+ /**
+ * Common method for creating vertex/edge input splits.
+ *
+ * @param inputFormat The vertex/edge input format
+ * @param inputSplitPaths ZooKeeper input split paths
+ * @param inputSplitType Type of input split (for logging purposes)
+ * @return Number of splits. Returns -1 on failure to create
+ * valid input splits.
+ */
+ private int createInputSplits(GiraphInputFormat inputFormat,
+ InputSplitPaths inputSplitPaths,
+ String inputSplitType) {
+ String logPrefix = "create" + inputSplitType + "InputSplits";
// Only the 'master' should be doing this. Wait until the number of
// processes that have reported health exceeds the minimum percentage.
// If the minimum percentage is not met, fail the job. Otherwise
// generate the input splits
+ String inputSplitsPath = inputSplitPaths.getPath();
try {
if (getZkExt().exists(inputSplitsPath, false) != null) {
- LOG.info(inputSplitsPath +
- " already exists, no need to create");
+ LOG.info(inputSplitsPath + " already exists, no need to create");
return Integer.parseInt(
- new String(
- getZkExt().getData(inputSplitsPath, false, null)));
+ new String(getZkExt().getData(inputSplitsPath, false, null)));
}
} catch (KeeperException.NoNodeException e) {
if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Need to create the " +
- "input splits at " + inputSplitsPath);
+ LOG.info(logPrefix + ": Need to create the input splits at " +
+ inputSplitsPath);
}
} catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
+ throw new IllegalStateException(logPrefix + ": KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: InterrtupedException", e);
+ throw new IllegalStateException(logPrefix + ": InterrtupedException", e);
}
// When creating znodes, in case the master has already run, resume
@@ -545,19 +550,18 @@ public class BspServiceMaster<I extends
// Note that the input splits may only be a sample if
// INPUT_SPLIT_SAMPLE_PERCENT is set to something other than 100
- List<InputSplit> splitList =
- generateInputSplits(healthyWorkerInfoList.size());
+ List<InputSplit> splitList = generateInputSplits(inputFormat,
+ healthyWorkerInfoList.size(), inputSplitType);
+
if (splitList.isEmpty()) {
- LOG.fatal("createInputSplits: Failing job due to 0 input splits, " +
- "check input of " +
- getConfiguration().getVertexInputFormatClass().getName() + "!");
+ LOG.fatal(logPrefix + ": Failing job due to 0 input splits, " +
+ "check input of " + inputFormat.getClass().getName() + "!");
getContext().setStatus("Failing job due to 0 input splits, " +
- "check input of " +
- getConfiguration().getVertexInputFormatClass().getName() + "!");
+ "check input of " + inputFormat.getClass().getName() + "!");
failJob();
}
if (healthyWorkerInfoList.size() > splitList.size()) {
- LOG.warn("createInputSplits: Number of inputSplits=" +
+ LOG.warn(logPrefix + ": Number of inputSplits=" +
splitList.size() + " < " +
healthyWorkerInfoList.size() +
"=number of healthy processes, " +
@@ -569,43 +573,65 @@ public class BspServiceMaster<I extends
INPUT_SPLIT_THREAD_COUNT,
DEFAULT_INPUT_SPLIT_THREAD_COUNT);
if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Starting to write input split data to " +
- "zookeeper with " + inputSplitThreadCount + " threads");
+ LOG.info(logPrefix + ": Starting to write input split data " +
+ "to zookeeper with " + inputSplitThreadCount + " threads");
}
ExecutorService taskExecutor =
Executors.newFixedThreadPool(inputSplitThreadCount);
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new WriteInputSplit(inputSplit, i));
+ taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i));
}
taskExecutor.shutdown();
ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
if (LOG.isInfoEnabled()) {
- LOG.info("createInputSplits: Done writing input split data to zookeeper");
+ LOG.info(logPrefix + ": Done writing input split data to zookeeper");
}
// Let workers know they can start trying to load the input splits
try {
- getZkExt().createExt(inputSplitsAllReadyPath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
+ getZkExt().createExt(inputSplitPaths.getAllReadyPath(),
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
} catch (KeeperException.NodeExistsException e) {
- LOG.info("createInputSplits: Node " +
- inputSplitsAllReadyPath + " already exists.");
+ LOG.info(logPrefix + ": Node " +
+ inputSplitPaths.getAllReadyPath() + " already exists.");
} catch (KeeperException e) {
- throw new IllegalStateException(
- "createInputSplits: KeeperException", e);
+ throw new IllegalStateException(logPrefix + ": KeeperException", e);
} catch (InterruptedException e) {
- throw new IllegalStateException(
- "createInputSplits: IllegalStateException", e);
+ throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
}
return splitList.size();
}
@Override
+ public int createVertexInputSplits() {
+ // Short-circuit if there is no vertex input format
+ if (!getConfiguration().hasVertexInputFormat()) {
+ return 0;
+ }
+ VertexInputFormat<I, V, E, M> vertexInputFormat =
+ getConfiguration().createVertexInputFormat();
+ return createInputSplits(vertexInputFormat, vertexInputSplitsPaths,
+ "Vertex");
+ }
+
+ @Override
+ public int createEdgeInputSplits() {
+ // Short-circuit if there is no edge input format
+ if (!getConfiguration().hasEdgeInputFormat()) {
+ return 0;
+ }
+ EdgeInputFormat<I, E> edgeInputFormat =
+ getConfiguration().createEdgeInputFormat();
+ return createInputSplits(edgeInputFormat, edgeInputSplitsPaths,
+ "Edge");
+ }
+
+ @Override
public List<WorkerInfo> getWorkerInfoList() {
return chosenWorkerInfoList;
}
@@ -1041,7 +1067,10 @@ public class BspServiceMaster<I extends
// 2. Increase the application attempt and set to the correct checkpoint
// 3. Send command to all workers to restart their tasks
try {
- getZkExt().deleteExt(inputSplitsPath, -1, true);
+ getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1,
+ true);
+ getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1,
+ true);
} catch (InterruptedException e) {
throw new RuntimeException(
"restartFromCheckpoint: InterruptedException", e);
@@ -1250,6 +1279,41 @@ public class BspServiceMaster<I extends
}
}
+ /**
+ * Coordinate the exchange of vertex/edge input splits among workers.
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ * @param inputSplitsType Type of input splits (for logging purposes)
+ */
+ private void coordinateInputSplits(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents,
+ String inputSplitsType) {
+ // Coordinate the workers finishing sending their vertices/edges to the
+ // correct workers and signal when everything is done.
+ String logPrefix = "coordinate" + inputSplitsType + "InputSplits";
+ if (!barrierOnWorkerList(inputSplitPaths.getDonePath(),
+ chosenWorkerInfoList,
+ inputSplitEvents.getDoneStateChanged())) {
+ throw new IllegalStateException(logPrefix + ": Worker failed during " +
+ "input split (currently not supported)");
+ }
+ try {
+ getZkExt().createExt(inputSplitPaths.getAllDonePath(),
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ false);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.info("coordinateInputSplits: Node " +
+ inputSplitPaths.getAllDonePath() + " already exists.");
+ } catch (KeeperException e) {
+ throw new IllegalStateException(logPrefix + ": KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(logPrefix + ": IllegalStateException", e);
+ }
+ }
+
@Override
public SuperstepState coordinateSuperstep() throws
KeeperException, InterruptedException {
@@ -1316,30 +1380,13 @@ public class BspServiceMaster<I extends
}
if (getSuperstep() == INPUT_SUPERSTEP) {
- // Coordinate the workers finishing sending their vertices to the
- // correct workers and signal when everything is done.
- if (!barrierOnWorkerList(inputSplitsDonePath,
- chosenWorkerInfoList,
- getInputSplitsDoneStateChangedEvent())) {
- throw new IllegalStateException(
- "coordinateSuperstep: Worker failed during input split " +
- "(currently not supported)");
- }
- try {
- getZkExt().createExt(inputSplitsAllDonePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- false);
- } catch (KeeperException.NodeExistsException e) {
- LOG.info("coordinateInputSplits: Node " +
- inputSplitsAllDonePath + " already exists.");
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "coordinateInputSplits: KeeperException", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "coordinateInputSplits: IllegalStateException", e);
+ if (getConfiguration().hasVertexInputFormat()) {
+ coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
+ "Vertex");
+ }
+ if (getConfiguration().hasEdgeInputFormat()) {
+ coordinateInputSplits(edgeInputSplitsPaths, edgeInputSplitsEvents,
+ "Edge");
}
}
@@ -1680,6 +1727,8 @@ public class BspServiceMaster<I extends
private class WriteInputSplit implements Callable<Void> {
/** Input split which we are going to write */
private final InputSplit inputSplit;
+ /** Input splits path */
+ private final String inputSplitsPath;
/** Index of the input split */
private final int index;
@@ -1687,10 +1736,14 @@ public class BspServiceMaster<I extends
* Constructor
*
* @param inputSplit Input split which we are going to write
+ * @param inputSplitsPath Input splits path
* @param index Index of the input split
*/
- public WriteInputSplit(InputSplit inputSplit, int index) {
+ public WriteInputSplit(InputSplit inputSplit,
+ String inputSplitsPath,
+ int index) {
this.inputSplit = inputSplit;
+ this.inputSplitsPath = inputSplitsPath;
this.index = index;
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Fri Nov 2 21:37:30 2012
@@ -18,12 +18,7 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import net.iharder.Base64;
+
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -67,6 +62,13 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+import net.iharder.Base64;
+
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
@@ -217,8 +219,8 @@ public class BspServiceWorker<I extends
}
/**
- * Load the vertices from the user-defined VertexReader into our partitions
- * of vertex ranges. Do this until all the InputSplits have been processed.
+ * Load the vertices/edges from input slits. Do this until all the
+ * InputSplits have been processed.
* All workers will try to do as many InputSplits as they can. The master
* will monitor progress and stop this once all the InputSplits have been
* loaded and check-pointed. Keep track of the last input split path to
@@ -227,47 +229,36 @@ public class BspServiceWorker<I extends
*
* Use one or more threads to do the loading.
*
- * @return Statistics of the vertices loaded
+ * @param inputSplitPathList List of input split paths
+ * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+ * @return Statistics of the vertices and edges loaded
* @throws InterruptedException
* @throws KeeperException
*/
- private VertexEdgeCount loadVertices()
- throws InterruptedException, KeeperException {
+ private VertexEdgeCount loadInputSplits(
+ List<String> inputSplitPathList,
+ InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+ throws KeeperException, InterruptedException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-
- // Get the number of splits first to determine how many threads to use
- List<String> inputSplitPathList =
- getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
-
- GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
- INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
- null);
+ // Determine how many threads to use based on the number of input splits
int maxInputSplitThreads =
Math.max(
inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
- int numThreads =
- Math.min(getConfiguration().getNumInputSplitsThreads(),
- maxInputSplitThreads);
+ int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+ maxInputSplitThreads);
ExecutorService inputSplitsExecutor =
Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("load-%d").build());
List<Future<VertexEdgeCount>> threadsFutures =
Lists.newArrayListWithCapacity(numThreads);
if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Using " + numThreads + " thread(s), " +
+ LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
"originally " + getConfiguration().getNumInputSplitsThreads() +
" threads(s) for " + inputSplitPathList.size() + " total splits.");
}
for (int i = 0; i < numThreads; ++i) {
Callable<VertexEdgeCount> inputSplitsCallable =
- new InputSplitsCallable<I, V, E, M>(
- getContext(),
- graphState,
- getConfiguration(),
- this,
- inputSplitPathList,
- getWorkerInfo(),
- getZkExt());
+ inputSplitsCallableFactory.newCallable();
threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
}
@@ -284,6 +275,63 @@ public class BspServiceWorker<I extends
return vertexEdgeCount;
}
+
+ /**
+ * Load the vertices from the user-defined {@link VertexReader}
+ *
+ * @return Count of vertices and edges loaded
+ */
+ private VertexEdgeCount loadVertices() throws KeeperException,
+ InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+ false, false, true);
+
+ GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+ INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+ null);
+
+ VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+ new VertexInputSplitsCallableFactory<I, V, E, M>(
+ getContext(),
+ graphState,
+ getConfiguration(),
+ this,
+ inputSplitPathList,
+ getWorkerInfo(),
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+ }
+
+ /**
+ * Load the edges from the user-defined {@link EdgeReader}.
+ *
+ * @return Number of edges loaded
+ */
+ private long loadEdges() throws KeeperException, InterruptedException {
+ List<String> inputSplitPathList =
+ getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+ false, false, true);
+
+ GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+ INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+ null);
+
+ EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+ new EdgeInputSplitsCallableFactory<I, V, E, M>(
+ getContext(),
+ graphState,
+ getConfiguration(),
+ this,
+ inputSplitPathList,
+ getWorkerInfo(),
+ getZkExt());
+
+ return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+ getEdgeCount();
+ }
+
@Override
public WorkerInfo getMasterInfo() {
return masterInfo;
@@ -294,6 +342,80 @@ public class BspServiceWorker<I extends
return workerInfoList;
}
+ /**
+ * Ensure the input splits are ready for processing
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ while (true) {
+ Stat inputSplitsReadyStat;
+ try {
+ inputSplitsReadyStat = getZkExt().exists(
+ inputSplitPaths.getAllReadyPath(), true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: KeeperException waiting on input splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: InterruptedException waiting on input splits", e);
+ }
+ if (inputSplitsReadyStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllReadyChanged().waitForever();
+ inputSplitEvents.getAllReadyChanged().reset();
+ }
+ }
+
+ /**
+ * Wait for all workers to finish processing input splits.
+ *
+ * @param inputSplitPaths Input split paths
+ * @param inputSplitEvents Input split events
+ */
+ private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+ InputSplitEvents inputSplitEvents) {
+ String workerInputSplitsDonePath =
+ inputSplitPaths.getDonePath() + "/" +
+ getWorkerInfo().getHostnameId();
+ try {
+ getZkExt().createExt(workerInputSplitsDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: KeeperException creating worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: InterruptedException creating worker done splits",
+ e);
+ }
+ while (true) {
+ Stat inputSplitsDoneStat;
+ try {
+ inputSplitsDoneStat =
+ getZkExt().exists(inputSplitPaths.getAllDonePath(),
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: KeeperException waiting on worker done splits", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: InterruptedException waiting on worker done splits", e);
+ }
+ if (inputSplitsDoneStat != null) {
+ break;
+ }
+ inputSplitEvents.getAllDoneChanged().waitForever();
+ inputSplitEvents.getAllDoneChanged().reset();
+ }
+ }
+
@Override
public FinishedSuperstepStats setup() {
// Unless doing a restart, prepare for computation:
@@ -301,7 +423,8 @@ public class BspServiceWorker<I extends
// 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
// 3. Process input splits until there are no more.
// 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
- // 5. Wait for superstep INPUT_SUPERSTEP to complete.
+ // 5. Process any mutations deriving from add edge requests
+ // 6. Wait for superstep INPUT_SUPERSTEP to complete.
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
setCachedSuperstep(getRestartedSuperstep());
return new FinishedSuperstepStats(false, -1, -1);
@@ -330,7 +453,7 @@ public class BspServiceWorker<I extends
}
}
- // Add the partitions for that this worker owns
+ // Add the partitions that this worker owns
GraphState<I, V, E, M> graphState =
new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
getContext(), getGraphMapper(), null);
@@ -345,79 +468,54 @@ else[HADOOP_NON_SECURE]*/
workerClient.setup(getConfiguration().authenticate());
/*end[HADOOP_NON_SECURE]*/
- // Ensure the InputSplits are ready for processing before processing
- while (true) {
- Stat inputSplitsReadyStat;
+ VertexEdgeCount vertexEdgeCount;
+
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Ensure the vertex InputSplits are ready for processing
+ ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+ getContext().progress();
try {
- inputSplitsReadyStat =
- getZkExt().exists(inputSplitsAllReadyPath, true);
+ vertexEdgeCount = loadVertices();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "setup: loadVertices failed with InterruptedException", e);
} catch (KeeperException e) {
throw new IllegalStateException(
- "setup: KeeperException waiting on input splits", e);
+ "setup: loadVertices failed with KeeperException", e);
+ }
+ getContext().progress();
+ } else {
+ vertexEdgeCount = new VertexEdgeCount();
+ }
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Ensure the edge InputSplits are ready for processing
+ ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+ getContext().progress();
+ try {
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
} catch (InterruptedException e) {
throw new IllegalStateException(
- "setup: InterruptedException waiting on input splits", e);
- }
- if (inputSplitsReadyStat != null) {
- break;
+ "setup: loadEdges failed with InterruptedException", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "setup: loadEdges failed with KeeperException", e);
}
- getInputSplitsAllReadyEvent().waitForever();
- getInputSplitsAllReadyEvent().reset();
+ getContext().progress();
}
- getContext().progress();
-
- // Load all the vertices and edges and get some stats about what was loaded
- VertexEdgeCount vertexEdgeCount = null;
- try {
- vertexEdgeCount = loadVertices();
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: loadVertices failed with InterruptedException", e);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: loadVertices failed with KeeperException", e);
- }
if (LOG.isInfoEnabled()) {
- LOG.info("setup: Finally loaded a total of " +
- vertexEdgeCount);
+ LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
}
- getContext().progress();
- // Workers wait for each other to finish, coordinated by master
- String workerDonePath =
- inputSplitsDonePath + "/" + getWorkerInfo().getHostnameId();
- try {
- getZkExt().createExt(workerDonePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: KeeperException creating worker done splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: InterruptedException creating worker done splits", e);
+ if (getConfiguration().hasVertexInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
}
- while (true) {
- Stat inputSplitsDoneStat;
- try {
- inputSplitsDoneStat =
- getZkExt().exists(inputSplitsAllDonePath, true);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "setup: KeeperException waiting on worker done splits", e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "setup: InterruptedException waiting on worker " +
- "done splits", e);
- }
- if (inputSplitsDoneStat != null) {
- break;
- }
- getInputSplitsAllDoneEvent().waitForever();
- getInputSplitsAllDoneEvent().reset();
+
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Workers wait for each other to finish, coordinated by master
+ waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
}
// Create remaining partitions owned by this worker.
@@ -432,6 +530,13 @@ else[HADOOP_NON_SECURE]*/
}
}
+ if (getConfiguration().hasEdgeInputFormat()) {
+ // Create vertices from added edges via vertex resolver.
+ // Doing this at the beginning of superstep 0 is not enough,
+ // because we want the vertex/edge stats to be accurate.
+ workerServer.resolveMutations(graphState);
+ }
+
// Generate the partition stats for the input superstep and process
// if necessary
List<PartitionStats> partitionStatsList =
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java Fri Nov 2 21:37:30 2012
@@ -185,6 +185,42 @@ public class BspUtils {
}
/**
+ * Get the user's subclassed {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return User's edge input format class
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, E extends Writable>
+ Class<? extends EdgeInputFormat<I, E>>
+ getEdgeInputFormatClass(Configuration conf) {
+ return (Class<? extends EdgeInputFormat<I, E>>)
+ conf.getClass(GiraphConfiguration.EDGE_INPUT_FORMAT_CLASS,
+ null,
+ EdgeInputFormat.class);
+ }
+
+ /**
+ * Create a user edge input format class
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ * @param conf Configuration to check
+ * @return Instantiated user edge input format class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, E extends Writable>
+ EdgeInputFormat<I, E> createEdgeInputFormat(Configuration conf) {
+ Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+ getEdgeInputFormatClass(conf);
+ EdgeInputFormat<I, E> inputFormat =
+ ReflectionUtils.newInstance(edgeInputFormatClass, conf);
+ return inputFormat;
+ }
+
+ /**
* Get the user's subclassed {@link AggregatorWriter}.
*
* @param conf Configuration to check
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Input format for reading single edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public abstract class EdgeInputFormat<I extends WritableComparable,
+ E extends Writable> implements GiraphInputFormat {
+ /**
+ * Logically split the vertices for a graph processing application.
+ *
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ *
+ * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+ * input files are not physically split into chunks. For e.g. a split could
+ * be <i><input-file-path, start, offset></i> tuple. The InputFormat
+ * also creates the {@link VertexReader} to read the {@link InputSplit}.
+ *
+ * Also, the number of workers is a hint given to the developer to try to
+ * intelligently determine how many splits to create (if this is
+ * adjustable) at runtime.
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
+ */
+ @Override
+ public abstract List<InputSplit> getSplits(
+ JobContext context, int numWorkers) throws IOException,
+ InterruptedException;
+
+ /**
+ * Create an edge reader for a given split. The framework will call
+ * {@link EdgeReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split the split to be read
+ * @param context the information about the task
+ * @return a new record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract EdgeReader<I, E> createEdgeReader(
+ InputSplit split,
+ TaskAttemptContext context) throws IOException;
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallable.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many edge input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends InputSplitsCallable<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ EdgeInputSplitsCallable.class);
+ /** Total edges loaded */
+ private long totalEdgesLoaded = 0;
+ /** Input split max edges (-1 denotes all) */
+ private final long inputSplitMaxEdges;
+
+ // Metrics
+ /** number of edges loaded counter */
+ private final Counter edgesLoadedCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker service worker
+ * @param inputSplitPathList List of the paths of the input splits
+ * @param workerInfo This worker's info
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ */
+ public EdgeInputSplitsCallable(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ super(context, graphState, configuration, bspServiceWorker,
+ inputSplitPathList, workerInfo, zooKeeperExt,
+ BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE,
+ BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE,
+ bspServiceWorker.edgeInputSplitsEvents);
+
+ inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+
+ // Initialize Metrics
+ edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+ "edges-loaded");
+ }
+
+ /**
+ * Read edges from input split. If testing, the user may request a
+ * maximum number of edges to be read from an input split.
+ *
+ * @param inputSplit Input split to process with edge reader
+ * @param graphState Current graph state
+ * @return Edges loaded from this input split
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ protected VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState) throws IOException,
+ InterruptedException {
+ EdgeInputFormat<I, E> edgeInputFormat =
+ configuration.createEdgeInputFormat();
+ EdgeReader<I, E> edgeReader =
+ edgeInputFormat.createEdgeReader(inputSplit, context);
+ edgeReader.initialize(inputSplit, context);
+ long inputSplitEdgesLoaded = 0;
+ while (edgeReader.nextEdge()) {
+ EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
+ if (readerEdge.getSourceVertexId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a source vertex id! - " + readerEdge);
+ }
+ if (readerEdge.getEdge().getTargetVertexId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a target vertex id! - " + readerEdge);
+ }
+ if (readerEdge.getEdge().getValue() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Edge reader returned an edge " +
+ "without a value! - " + readerEdge);
+ }
+
+ graphState.getWorkerClientRequestProcessor().addEdgeRequest(
+ readerEdge.getSourceVertexId(), readerEdge.getEdge());
+ context.progress(); // do this before potential data transfer
+ ++inputSplitEdgesLoaded;
+
+ // Update status every 1M edges
+ if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) {
+ LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+ "readInputSplit: Loaded " +
+ (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ // For sampling, or to limit outlier input splits, the number of
+ // records per input split can be limited
+ if (inputSplitMaxEdges > 0 &&
+ inputSplitEdgesLoaded >= inputSplitMaxEdges) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("readInputSplit: Leaving the input " +
+ "split early, reached maximum edges " +
+ inputSplitEdgesLoaded);
+ }
+ break;
+ }
+ }
+ edgeReader.close();
+ totalEdgesLoaded += inputSplitEdgesLoaded;
+ edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+ return new VertexEdgeCount(0, inputSplitEdgesLoaded);
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeInputSplitsCallableFactory.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link EdgeInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class EdgeInputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements InputSplitsCallableFactory<I, V, E, M> {
+ /** Mapper context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Graph state. */
+ private final GraphState<I, V, E, M> graphState;
+ /** Configuration. */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** {@link BspServiceWorker} we're running on. */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ /** List of input split paths. */
+ private final List<String> inputSplitPathList;
+ /** Worker info. */
+ private final WorkerInfo workerInfo;
+ /** {@link ZooKeeperExt} for this worker. */
+ private final ZooKeeperExt zooKeeperExt;
+
+ /**
+ * Constructor.
+ *
+ * @param context Mapper context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker Calling {@link BspServiceWorker}
+ * @param inputSplitPathList List of input split paths
+ * @param workerInfo Worker info
+ * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+ */
+ public EdgeInputSplitsCallableFactory(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ this.context = context;
+ this.graphState = graphState;
+ this.configuration = configuration;
+ this.bspServiceWorker = bspServiceWorker;
+ this.inputSplitPathList = inputSplitPathList;
+ this.workerInfo = workerInfo;
+ this.zooKeeperExt = zooKeeperExt;
+ }
+
+ @Override
+ public InputSplitsCallable<I, V, E, M> newCallable() {
+ return new EdgeInputSplitsCallable<I, V, E, M>(
+ context,
+ graphState,
+ configuration,
+ bspServiceWorker,
+ inputSplitPathList,
+ workerInfo,
+ zooKeeperExt);
+ }
+}