You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/11/02 22:37:31 UTC
svn commit: r1405175 [2/3] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/o...
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeReader.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link RecordReader} for edges. Will read the edges
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+@SuppressWarnings("rawtypes")
+public interface EdgeReader<I extends WritableComparable, E extends Writable> {
+ /**
+ * Use the input split and context to setup reading the edges.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit Input split to be used for reading edges.
+ * @param context Context from the task.
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context) throws
+ IOException, InterruptedException;
+
+ /**
+ *
+ * @return false iff there are no more edges
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean nextEdge() throws IOException, InterruptedException;
+
+ /**
+ * Get the current edge.
+ *
+ * @return the current edge which has been read.
+ * nextEdge() should be called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ InterruptedException;
+
+ /**
+ * Close this {@link EdgeReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link EdgeReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeWithSource.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A pair of source vertex id and Edge object (that is,
+ * all the information about an edge).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class EdgeWithSource<I extends WritableComparable, E extends Writable> {
+ /** Source vertex id. */
+ private final I sourceVertexId;
+ /** Edge. */
+ private final Edge<I, E> edge;
+
+ /**
+ * Constructor.
+ *
+ * @param sourceVertexId Source vertex id
+ * @param edge Edge
+ */
+ public EdgeWithSource(I sourceVertexId, Edge<I, E> edge) {
+ this.sourceVertexId = sourceVertexId;
+ this.edge = edge;
+ }
+
+ /**
+ * Get the source vertex id.
+ *
+ * @return Source vertex id.
+ */
+ public I getSourceVertexId() {
+ return sourceVertexId;
+ }
+
+ /**
+ * Get the edge object.
+ *
+ * @return The edge.
+ */
+ public Edge<I, E> getEdge() {
+ return edge;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Common interface for {@link VertexInputFormat} and {@link EdgeInputFormat}.
+ */
+public interface GiraphInputFormat {
+ /**
+ * Get the list of input splits for the format.
+ *
+ * @param context The job context
+ * @param numWorkers Number of workers
+ * @return The list of input splits
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException;
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphJob.java Fri Nov 2 21:37:30 2012
@@ -153,9 +153,16 @@ public class GiraphJob {
throw new IllegalArgumentException("checkConfiguration: Null" +
GiraphConfiguration.VERTEX_CLASS);
}
- if (conf.getVertexInputFormatClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: Null " +
- GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS);
+ if (conf.getVertexInputFormatClass() == null &&
+ conf.getEdgeInputFormatClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: One of " +
+ GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS + " and " +
+ GiraphConfiguration.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
+ }
+ if (conf.getEdgeInputFormatClass() != null &&
+ !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
+ throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
+ " only works with mutable vertices");
}
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Nov 2 21:37:30 2012
@@ -18,10 +18,6 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -46,6 +42,11 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitEvents.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Simple container of input split events.
+ */
+public class InputSplitEvents {
+ /** Input splits are ready for consumption by workers */
+ private final BspEvent allReadyChanged;
+ /** Input split reservation or finished notification and synchronization */
+ private final BspEvent stateChanged;
+ /** Input splits are done being processed by workers */
+ private final BspEvent allDoneChanged;
+ /** Input split done by a worker finished notification and synchronization */
+ private final BspEvent doneStateChanged;
+
+ /**
+ * Constructor.
+ *
+ * @param progressable {@link Progressable} to report progress
+ */
+ public InputSplitEvents(Progressable progressable) {
+ allReadyChanged = new PredicateLock(progressable);
+ stateChanged = new PredicateLock(progressable);
+ allDoneChanged = new PredicateLock(progressable);
+ doneStateChanged = new PredicateLock(progressable);
+ }
+
+ /**
+ * Get event for input splits all ready
+ *
+ * @return {@link BspEvent} for input splits all ready
+ */
+ public BspEvent getAllReadyChanged() {
+ return allReadyChanged;
+ }
+
+ /**
+ * Get event for input splits state
+ *
+ * @return {@link BspEvent} for input splits state
+ */
+ public BspEvent getStateChanged() {
+ return stateChanged;
+ }
+
+ /**
+ * Get event for input splits all done
+ *
+ * @return {@link BspEvent} for input splits all done
+ */
+ public BspEvent getAllDoneChanged() {
+ return allDoneChanged;
+ }
+
+ /**
+ * Get event for input split done
+ *
+ * @return {@link BspEvent} for input split done
+ */
+ public BspEvent getDoneStateChanged() {
+ return doneStateChanged;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitPaths.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Simple container of input split paths for coordination via ZooKeeper.
+ */
+public class InputSplitPaths {
+ /** Path to the input splits written by the master */
+ private final String path;
+ /** Path to the input splits all ready to be processed by workers */
+ private final String allReadyPath;
+ /** Path to the input splits done */
+ private final String donePath;
+ /** Path to the input splits all done to notify the workers to proceed */
+ private final String allDonePath;
+
+ /**
+ * Constructor.
+ *
+ * @param basePath Base path
+ * @param dir Input splits path
+ * @param doneDir Input split done path
+ * @param allReadyNode Input splits all ready path
+ * @param allDoneNode Input splits all done path
+ */
+ public InputSplitPaths(String basePath,
+ String dir,
+ String doneDir,
+ String allReadyNode,
+ String allDoneNode) {
+ path = basePath + dir;
+ allReadyPath = basePath + allReadyNode;
+ donePath = basePath + doneDir;
+ allDonePath = basePath + allDoneNode;
+ }
+
+ /**
+ * Get path to the input splits.
+ *
+ * @return Path to input splits
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * Get path to the input splits all ready.
+ *
+ * @return Path to input splits all ready
+ */
+ public String getAllReadyPath() {
+ return allReadyPath;
+ }
+
+ /** Get path to the input splits done.
+ *
+ * @return Path to input splits done
+ */
+ public String getDonePath() {
+ return donePath;
+ }
+
+ /**
+ * Get path to the input splits all done.
+ *
+ * @return Path to input splits all done
+ */
+ public String getAllDonePath() {
+ return allDonePath;
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Fri Nov 2 21:37:30 2012
@@ -15,23 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.giraph.graph;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
+package org.apache.giraph.graph;
-import com.yammer.metrics.core.Counter;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.MetricGroup;
-import org.apache.giraph.utils.LoggerUtils;
-import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
import org.apache.giraph.zk.ZooKeeperExt;
@@ -41,15 +30,20 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
/**
- * Load as many input splits as possible.
+ * Abstract base class for loading vertex/edge input splits.
* Every thread will has its own instance of WorkerClientRequestProcessor
* to send requests.
*
@@ -58,7 +52,7 @@ import org.apache.zookeeper.data.Stat;
* @param <E> Edge value
* @param <M> Message data
*/
-public class InputSplitsCallable<I extends WritableComparable,
+public abstract class InputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements Callable<VertexEdgeCount> {
/** Class logger */
@@ -66,8 +60,11 @@ public class InputSplitsCallable<I exten
Logger.getLogger(InputSplitsCallable.class);
/** Class time object */
private static final Time TIME = SystemTime.getInstance();
+ /** Configuration */
+ protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
+ configuration;
/** Context */
- private final Mapper<?, ?, ?, ?>.Context context;
+ protected final Mapper<?, ?, ?, ?>.Context context;
/** Graph state */
private final GraphState<I, V, E, M> graphState;
/** Handles IPC communication */
@@ -80,26 +77,16 @@ public class InputSplitsCallable<I exten
private final InputSplitPathOrganizer splitOrganizer;
/** ZooKeeperExt handle */
private final ZooKeeperExt zooKeeperExt;
- /** Configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M>
- configuration;
- /** Total vertices loaded */
- private long totalVerticesLoaded = 0;
- /** Total edges loaded */
- private long totalEdgesLoaded = 0;
- /** Input split max vertices (-1 denotes all) */
- private final long inputSplitMaxVertices;
- /** Bsp service worker (only use thread-safe methods) */
- private final BspServiceWorker<I, V, E, M> bspServiceWorker;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();
+ /** ZooKeeper input split reserved node. */
+ private final String inputSplitReservedNode;
+ /** ZooKeeper input split finished node. */
+ private final String inputSplitFinishedNode;
+ /** Input split events. */
+ private final InputSplitEvents inputSplitEvents;
- // Metrics
- /** number of vertices loaded counter */
- private final Counter verticesLoadedCounter;
- /** number of edges loaded counter */
- private final Counter edgesLoadedCounter;
-
+ // CHECKSTYLE: stop ParameterNumberCheck
/**
* Constructor.
*
@@ -110,14 +97,21 @@ public class InputSplitsCallable<I exten
* @param inputSplitPathList List of the paths of the input splits
* @param workerInfo This worker's info
* @param zooKeeperExt Handle to ZooKeeperExt
+ * @param inputSplitReservedNode Path to input split reserved
+ * @param inputSplitFinishedNode Path to input split finsished
+ * @param inputSplitEvents Input split events
*/
public InputSplitsCallable(
- Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
BspServiceWorker<I, V, E, M> bspServiceWorker,
List<String> inputSplitPathList,
WorkerInfo workerInfo,
- ZooKeeperExt zooKeeperExt) {
+ ZooKeeperExt zooKeeperExt,
+ String inputSplitReservedNode,
+ String inputSplitFinishedNode,
+ InputSplitEvents inputSplitEvents) {
this.zooKeeperExt = zooKeeperExt;
this.context = context;
this.workerClientRequestProcessor =
@@ -137,25 +131,35 @@ public class InputSplitsCallable<I exten
"InputSplitsCallable: InterruptedException", e);
}
this.configuration = configuration;
- inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
- this.bspServiceWorker = bspServiceWorker;
-
- // Initialize Metrics
- verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
- "vertices-loaded");
- edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
- "edges-loaded");
+ this.inputSplitReservedNode = inputSplitReservedNode;
+ this.inputSplitFinishedNode = inputSplitFinishedNode;
+ this.inputSplitEvents = inputSplitEvents;
}
+ // CHECKSTYLE: resume ParameterNumberCheck
+
+ /**
+ * Load vertices/edges from the given input split.
+ *
+ * @param inputSplit Input split to load
+ * @param graphState Graph state
+ * @return Count of vertices and edges loaded
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected abstract VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState)
+ throws IOException, InterruptedException;
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
- String inputSplitPath = null;
+ String inputSplitPath;
int inputSplitsProcessed = 0;
try {
while ((inputSplitPath = reserveInputSplit()) != null) {
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
- loadVerticesFromInputSplit(inputSplitPath,
+ loadInputSplit(inputSplitPath,
graphState));
context.progress();
++inputSplitsProcessed;
@@ -212,13 +216,13 @@ public class InputSplitsCallable<I exten
private String reserveInputSplit()
throws KeeperException, InterruptedException {
String reservedInputSplitPath = null;
- Stat reservedStat = null;
+ Stat reservedStat;
while (true) {
int reservedInputSplits = 0;
for (String nextSplitToClaim : splitOrganizer) {
context.progress();
- String tmpInputSplitReservedPath =
- nextSplitToClaim + BspServiceWorker.INPUT_SPLIT_RESERVED_NODE;
+ String tmpInputSplitReservedPath = nextSplitToClaim +
+ inputSplitReservedNode;
reservedStat =
zooKeeperExt.exists(tmpInputSplitReservedPath, true);
if (reservedStat == null) {
@@ -270,9 +274,9 @@ public class InputSplitsCallable<I exten
// Wait for either a reservation to go away or a notification that
// an InputSplit has finished.
context.progress();
- bspServiceWorker.getInputSplitsStateChangedEvent().waitMsecs(
+ inputSplitEvents.getStateChanged().waitMsecs(
60 * 1000);
- bspServiceWorker.getInputSplitsStateChangedEvent().reset();
+ inputSplitEvents.getStateChanged().reset();
}
}
@@ -285,7 +289,7 @@ public class InputSplitsCallable<I exten
*/
private void markInputSplitPathFinished(String inputSplitPath) {
String inputSplitFinishedPath =
- inputSplitPath + BspServiceWorker.INPUT_SPLIT_FINISHED_NODE;
+ inputSplitPath + inputSplitFinishedNode;
try {
zooKeeperExt.createExt(inputSplitFinishedPath,
null,
@@ -293,15 +297,15 @@ public class InputSplitsCallable<I exten
CreateMode.PERSISTENT,
true);
} catch (KeeperException.NodeExistsException e) {
- LOG.warn("loadVertices: " + inputSplitFinishedPath +
+ LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
" already exists!");
} catch (KeeperException e) {
throw new IllegalStateException(
- "loadVertices: KeeperException on " +
+ "markInputSplitPathFinished: KeeperException on " +
inputSplitFinishedPath, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
- "loadVertices: InterruptedException on " +
+ "markInputSplitPathFinished: InterruptedException on " +
inputSplitFinishedPath, e);
}
}
@@ -321,16 +325,16 @@ public class InputSplitsCallable<I exten
* @throws InstantiationException
* @throws IllegalAccessException
*/
- private VertexEdgeCount loadVerticesFromInputSplit(
+ private VertexEdgeCount loadInputSplit(
String inputSplitPath,
GraphState<I, V, E, M> graphState)
throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
- InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
+ InputSplit inputSplit = getInputSplit(inputSplitPath);
VertexEdgeCount vertexEdgeCount =
- readVerticesFromInputSplit(inputSplit, graphState);
+ readInputSplit(inputSplit, graphState);
if (LOG.isInfoEnabled()) {
- LOG.info("loadVerticesFromInputSplit: Finished loading " +
+ LOG.info("loadFromInputSplit: Finished loading " +
inputSplitPath + " " + vertexEdgeCount);
}
markInputSplitPathFinished(inputSplitPath);
@@ -339,24 +343,24 @@ public class InputSplitsCallable<I exten
/**
* Talk to ZooKeeper to convert the input split path to the actual
- * InputSplit containing the vertices to read.
+ * InputSplit.
*
* @param inputSplitPath Location in ZK of input split
- * @return instance of InputSplit containing vertices to read
+ * @return instance of InputSplit
* @throws IOException
* @throws ClassNotFoundException
*/
- private InputSplit getInputSplitForVertices(String inputSplitPath)
+ protected InputSplit getInputSplit(String inputSplitPath)
throws IOException, ClassNotFoundException {
byte[] splitList;
try {
splitList = zooKeeperExt.getData(inputSplitPath, false, null);
} catch (KeeperException e) {
throw new IllegalStateException(
- "loadVertices: KeeperException on " + inputSplitPath, e);
+ "getInputSplit: KeeperException on " + inputSplitPath, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
- "loadVertices: IllegalStateException on " + inputSplitPath, e);
+ "getInputSplit: IllegalStateException on " + inputSplitPath, e);
}
context.progress();
@@ -371,85 +375,10 @@ public class InputSplitsCallable<I exten
((Writable) inputSplit).readFields(inputStream);
if (LOG.isInfoEnabled()) {
- LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
+ LOG.info("getInputSplit: Reserved " + inputSplitPath +
" from ZooKeeper and got input split '" +
inputSplit.toString() + "'");
}
return inputSplit;
}
-
- /**
- * Read vertices from input split. If testing, the user may request a
- * maximum number of vertices to be read from an input split.
- *
- * @param inputSplit Input split to process with vertex reader
- * @param graphState Current graph state
- * @return Vertices and edges loaded from this input split
- * @throws IOException
- * @throws InterruptedException
- */
- private VertexEdgeCount readVerticesFromInputSplit(
- InputSplit inputSplit,
- GraphState<I, V, E, M> graphState)
- throws IOException, InterruptedException {
- VertexInputFormat<I, V, E, M> vertexInputFormat =
- configuration.createVertexInputFormat();
- VertexReader<I, V, E, M> vertexReader =
- vertexInputFormat.createVertexReader(inputSplit, context);
- vertexReader.initialize(inputSplit, context);
- long inputSplitVerticesLoaded = 0;
- long inputSplitEdgesLoaded = 0;
- while (vertexReader.nextVertex()) {
- Vertex<I, V, E, M> readerVertex =
- vertexReader.getCurrentVertex();
- if (readerVertex.getId() == null) {
- throw new IllegalArgumentException(
- "readVerticesFromInputSplit: Vertex reader returned a vertex " +
- "without an id! - " + readerVertex);
- }
- if (readerVertex.getValue() == null) {
- readerVertex.setValue(configuration.createVertexValue());
- }
- readerVertex.setConf(configuration);
- readerVertex.setGraphState(graphState);
-
- PartitionOwner partitionOwner =
- bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
- graphState.getWorkerClientRequestProcessor().sendVertexRequest(
- partitionOwner, readerVertex);
- context.progress(); // do this before potential data transfer
- ++inputSplitVerticesLoaded;
- inputSplitEdgesLoaded += readerVertex.getNumEdges();
-
- // Update status every 250k vertices
- if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
- LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
- "readVerticesFromInputSplit: Loaded " +
- (inputSplitVerticesLoaded + totalVerticesLoaded) +
- " vertices " +
- (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
- MemoryUtils.getRuntimeMemoryStats());
- }
-
- // For sampling, or to limit outlier input splits, the number of
- // records per input split can be limited
- if (inputSplitMaxVertices > 0 &&
- inputSplitVerticesLoaded >= inputSplitMaxVertices) {
- if (LOG.isInfoEnabled()) {
- LOG.info("readVerticesFromInputSplit: Leaving the input " +
- "split early, reached maximum vertices " +
- inputSplitVerticesLoaded);
- }
- break;
- }
- }
- vertexReader.close();
- totalVerticesLoaded += inputSplitVerticesLoaded;
- verticesLoadedCounter.inc(inputSplitVerticesLoaded);
- totalEdgesLoaded += inputSplitEdgesLoaded;
- edgesLoadedCounter.inc(inputSplitEdgesLoaded);
- return new VertexEdgeCount(
- inputSplitVerticesLoaded, inputSplitEdgesLoaded);
- }
}
-
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory class for creating {@link InputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public interface InputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Return a newly-created {@link InputSplitsCallable}.
+ *
+ * @return A new {@link InputSplitsCallable}
+ */
+ InputSplitsCallable<I, V, E, M> newCallable();
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java Fri Nov 2 21:37:30 2012
@@ -18,11 +18,6 @@
package org.apache.giraph.graph;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -33,6 +28,11 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
/**
* Master thread that will coordinate the activities of the tasks. It runs
* on all task processes, however, will only execute its algorithm if it knows
@@ -98,7 +98,8 @@ public class MasterThread<I extends Writ
// Attempt to create InputSplits if necessary. Bail out if that fails.
if (bspServiceMaster.getRestartedSuperstep() !=
BspService.UNSET_SUPERSTEP ||
- bspServiceMaster.createInputSplits() != -1) {
+ (bspServiceMaster.createVertexInputSplits() != -1 &&
+ bspServiceMaster.createEdgeInputSplits() != -1)) {
long setupMillis = System.currentTimeMillis() - startMillis;
context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
"Setup (milliseconds)").
@@ -125,13 +126,13 @@ public class MasterThread<I extends Writ
if (superstepCounterOn) {
String counterPrefix;
if (cachedSuperstep == -1) {
- counterPrefix = "Vertex input superstep";
+ counterPrefix = "Input superstep";
} else {
counterPrefix = "Superstep " + cachedSuperstep;
}
context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
counterPrefix +
- " (milliseconds)").
+ " (milliseconds)").
increment(superstepMillis);
}
@@ -158,7 +159,7 @@ public class MasterThread<I extends Writ
if (LOG.isInfoEnabled()) {
if (entry.getKey().longValue() ==
BspService.INPUT_SUPERSTEP) {
- LOG.info("vertex input superstep: Took " +
+ LOG.info("input superstep: Took " +
entry.getValue() + " seconds.");
} else {
LOG.info("superstep " + entry.getKey() + ": Took " +
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputFormat.java Fri Nov 2 21:37:30 2012
@@ -40,7 +40,8 @@ import java.util.List;
*/
@SuppressWarnings("rawtypes")
public abstract class VertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable, M extends Writable>
+ implements GiraphInputFormat {
/**
* Logically split the vertices for a graph processing application.
*
@@ -59,6 +60,7 @@ public abstract class VertexInputFormat<
* @param numWorkers Number of workers used for this job
* @return an array of {@link InputSplit}s for the job.
*/
+ @Override
public abstract List<InputSplit> getSplits(
JobContext context, int numWorkers)
throws IOException, InterruptedException;
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallable.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricGroup;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.yammer.metrics.core.Counter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Load as many vertex input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallable<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends InputSplitsCallable<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(VertexInputSplitsCallable.class);
+ /** Total vertices loaded */
+ private long totalVerticesLoaded = 0;
+ /** Total edges loaded */
+ private long totalEdgesLoaded = 0;
+ /** Input split max vertices (-1 denotes all) */
+ private final long inputSplitMaxVertices;
+ /** Bsp service worker (only use thread-safe methods) */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+
+ // Metrics
+ /** number of vertices loaded counter */
+ private final Counter verticesLoadedCounter;
+ /** number of edges loaded counter */
+ private final Counter edgesLoadedCounter;
+
+ /**
+ * Constructor.
+ *
+ * @param context Context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker service worker
+ * @param inputSplitPathList List of the paths of the input splits
+ * @param workerInfo This worker's info
+ * @param zooKeeperExt Handle to ZooKeeperExt
+ */
+ public VertexInputSplitsCallable(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ super(context, graphState, configuration, bspServiceWorker,
+ inputSplitPathList, workerInfo, zooKeeperExt,
+ BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE,
+ BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE,
+ bspServiceWorker.vertexInputSplitsEvents);
+
+ inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
+ this.bspServiceWorker = bspServiceWorker;
+
+ // Initialize Metrics
+ verticesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+ "vertices-loaded");
+ edgesLoadedCounter = GiraphMetrics.getCounter(MetricGroup.IO,
+ "edges-loaded");
+ }
+
+ /**
+ * Read vertices from input split. If testing, the user may request a
+ * maximum number of vertices to be read from an input split.
+ *
+ * @param inputSplit Input split to process with vertex reader
+ * @param graphState Current graph state
+ * @return Vertices and edges loaded from this input split
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ protected VertexEdgeCount readInputSplit(
+ InputSplit inputSplit,
+ GraphState<I, V, E, M> graphState)
+ throws IOException, InterruptedException {
+ VertexInputFormat<I, V, E, M> vertexInputFormat =
+ configuration.createVertexInputFormat();
+ VertexReader<I, V, E, M> vertexReader =
+ vertexInputFormat.createVertexReader(inputSplit, context);
+ vertexReader.initialize(inputSplit, context);
+ long inputSplitVerticesLoaded = 0;
+ long inputSplitEdgesLoaded = 0;
+ while (vertexReader.nextVertex()) {
+ Vertex<I, V, E, M> readerVertex =
+ vertexReader.getCurrentVertex();
+ if (readerVertex.getId() == null) {
+ throw new IllegalArgumentException(
+ "readInputSplit: Vertex reader returned a vertex " +
+ "without an id! - " + readerVertex);
+ }
+ if (readerVertex.getValue() == null) {
+ readerVertex.setValue(configuration.createVertexValue());
+ }
+ readerVertex.setConf(configuration);
+ readerVertex.setGraphState(graphState);
+
+ PartitionOwner partitionOwner =
+ bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
+ graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+ partitionOwner, readerVertex);
+ context.progress(); // do this before potential data transfer
+ ++inputSplitVerticesLoaded;
+ inputSplitEdgesLoaded += readerVertex.getNumEdges();
+
+ // Update status every 250k vertices
+ if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
+ LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+ "readInputSplit: Loaded " +
+ (inputSplitVerticesLoaded + totalVerticesLoaded) +
+ " vertices " +
+ (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
+ // For sampling, or to limit outlier input splits, the number of
+ // records per input split can be limited
+ if (inputSplitMaxVertices > 0 &&
+ inputSplitVerticesLoaded >= inputSplitMaxVertices) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("readInputSplit: Leaving the input " +
+ "split early, reached maximum vertices " +
+ inputSplitVerticesLoaded);
+ }
+ break;
+ }
+ }
+ vertexReader.close();
+ totalVerticesLoaded += inputSplitVerticesLoaded;
+ verticesLoadedCounter.inc(inputSplitVerticesLoaded);
+ totalEdgesLoaded += inputSplitEdgesLoaded;
+ edgesLoadedCounter.inc(inputSplitEdgesLoaded);
+ return new VertexEdgeCount(
+ inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+ }
+}
+
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexInputSplitsCallableFactory.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.List;
+
+/**
+ * Factory for {@link VertexInputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class VertexInputSplitsCallableFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements InputSplitsCallableFactory<I, V, E, M> {
+ /** Mapper context. */
+ private final Mapper<?, ?, ?, ?>.Context context;
+ /** Graph state. */
+ private final GraphState<I, V, E, M> graphState;
+ /** Configuration. */
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** {@link BspServiceWorker} we're running on. */
+ private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ /** List of input split paths. */
+ private final List<String> inputSplitPathList;
+ /** Worker info. */
+ private final WorkerInfo workerInfo;
+ /** {@link ZooKeeperExt} for this worker. */
+ private final ZooKeeperExt zooKeeperExt;
+
+ /**
+ * Constructor.
+ *
+ * @param context Mapper context
+ * @param graphState Graph state
+ * @param configuration Configuration
+ * @param bspServiceWorker Calling {@link BspServiceWorker}
+ * @param inputSplitPathList List of input split paths
+ * @param workerInfo Worker info
+ * @param zooKeeperExt {@link ZooKeeperExt} for this worker
+ */
+ public VertexInputSplitsCallableFactory(
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphState<I, V, E, M> graphState,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ BspServiceWorker<I, V, E, M> bspServiceWorker,
+ List<String> inputSplitPathList,
+ WorkerInfo workerInfo,
+ ZooKeeperExt zooKeeperExt) {
+ this.context = context;
+ this.graphState = graphState;
+ this.configuration = configuration;
+ this.bspServiceWorker = bspServiceWorker;
+ this.inputSplitPathList = inputSplitPathList;
+ this.workerInfo = workerInfo;
+ this.zooKeeperExt = zooKeeperExt;
+ }
+
+ @Override
+ public InputSplitsCallable<I, V, E, M> newCallable() {
+ return new VertexInputSplitsCallable<I, V, E, M>(
+ context,
+ graphState,
+ configuration,
+ bspServiceWorker,
+ inputSplitPathList,
+ workerInfo,
+ zooKeeperExt);
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexReader.java Fri Nov 2 21:37:30 2012
@@ -38,7 +38,7 @@ import java.io.IOException;
public interface VertexReader<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
/**
- * Use the input split and context t o setup reading the vertices.
+ * Use the input split and context to setup reading the vertices.
* Guaranteed to be called prior to any other function.
*
* @param inputSplit Input split to be used for reading vertices.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1405175&r1=1405174&r2=1405175&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexResolver.java Fri Nov 2 21:37:30 2012
@@ -57,7 +57,7 @@ public class VertexResolver<I extends Wr
// 1. If the vertex exists, first prune the edges
// 2. If vertex removal desired, remove the vertex.
// 3. If creation of vertex desired, pick first vertex
- // 4. If vertex doesn't exist, but got messages, create
+ // 4. If vertex doesn't exist, but got messages or added edges, create
// 5. If edge addition, add the edges
if (vertex != null) {
if (vertexChanges != null) {
@@ -84,7 +84,8 @@ public class VertexResolver<I extends Wr
vertex = vertexChanges.getAddedVertexList().get(0);
}
}
- if (vertex == null && hasMessages) {
+ if (vertex == null &&
+ (hasMessages || !vertexChanges.getAddedEdgeList().isEmpty())) {
vertex = instantiateVertex();
vertex.initialize(vertexId,
getConf().createVertexValue(),
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex input format that only allows setting vertex id and value. It can
+ * be used in conjunction with {@link EdgeInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueInputFormat<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
+ /**
+ * Create a {@link VertexValueReader} for a given split. The framework will
+ * call {@link VertexValueReader#initialize(InputSplit,
+ * TaskAttemptContext)} before the split is used.
+ *
+ * @param split The split to be read
+ * @param context The information about the task
+ * @return A new vertex value reader
+ * @throws IOException
+ */
+ public abstract VertexValueReader<I, V, E, M> createVertexValueReader(
+ InputSplit split, TaskAttemptContext context) throws IOException;
+
+ @Override
+ public final VertexReader<I, V, E, M> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return createVertexValueReader(split, context);
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexValueReader.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Vertex reader for {@link VertexValueInputFormat}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+public abstract class VertexValueReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BasicVertexValueReader<I, V, E, M> {
+ /** Configuration. */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
+ }
+
+ @Override
+ public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
+ InterruptedException {
+ Vertex<I, V, E, M> vertex = getConf().createVertex();
+ vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
+ return vertex;
+ }
+
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return configuration;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphFileInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+import org.apache.hadoop.mapreduce.security.TokenCache;
+end[HADOOP_NON_SECURE]*/
+
+/**
+ * Provides functionality similar to {@link FileInputFormat},
+ * but allows for different data sources (vertex and edge data).
+ *
+ * @param <K> Key
+ * @param <V> Value
+ */
+public abstract class GiraphFileInputFormat<K, V>
+ extends FileInputFormat<K, V> {
+ /** Vertex input file paths. */
+ public static final String VERTEX_INPUT_DIR = "giraph.vertex.input.dir";
+ /** Edge input file paths. */
+ public static final String EDGE_INPUT_DIR = "giraph.edge.input.dir";
+ /** Number of vertex input files. */
+ public static final String NUM_VERTEX_INPUT_FILES =
+ "giraph.input.vertex.num.files";
+ /** Number of edge input files. */
+ public static final String NUM_EDGE_INPUT_FILES =
+ "giraph.input.edge.num.files";
+
+ /** Split slop. */
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ /** Filter for hidden files. */
+ private static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /** Class logger. */
+ private static final Logger LOG =
+ Logger.getLogger(GiraphFileInputFormat.class);
+
+ /**
+ * Add a {@link org.apache.hadoop.fs.Path} to the list of vertex inputs.
+ *
+ * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+ * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+ * vertex inputs
+ */
+ public static void addVertexInputPath(Job job, Path path) throws IOException {
+ Configuration conf = job.getConfiguration();
+ path = path.getFileSystem(conf).makeQualified(path);
+ String dirStr = StringUtils.escapeString(path.toString());
+ String dirs = conf.get(VERTEX_INPUT_DIR);
+ conf.set(VERTEX_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+ }
+
+ /**
+ * Add a {@link org.apache.hadoop.fs.Path} to the list of edge inputs.
+ *
+ * @param job The {@link org.apache.hadoop.mapreduce.Job} to modify
+ * @param path {@link org.apache.hadoop.fs.Path} to be added to the list of
+ * edge inputs
+ */
+ public static void addEdgeInputPath(Job job, Path path) throws IOException {
+ Configuration conf = job.getConfiguration();
+ path = path.getFileSystem(conf).makeQualified(path);
+ String dirStr = StringUtils.escapeString(path.toString());
+ String dirs = conf.get(EDGE_INPUT_DIR);
+ conf.set(EDGE_INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
+ }
+
+ /**
+ * Get the list of vertex input {@link Path}s.
+ *
+ * @param context The job
+ * @return The list of input {@link Path}s
+ */
+ public static Path[] getVertexInputPaths(JobContext context) {
+ String dirs = context.getConfiguration().get(VERTEX_INPUT_DIR, "");
+ String [] list = StringUtils.split(dirs);
+ Path[] result = new Path[list.length];
+ for (int i = 0; i < list.length; i++) {
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
+ }
+ return result;
+ }
+
+ /**
+ * Get the list of edge input {@link Path}s.
+ *
+ * @param context The job
+ * @return The list of input {@link Path}s
+ */
+ public static Path[] getEdgeInputPaths(JobContext context) {
+ String dirs = context.getConfiguration().get(EDGE_INPUT_DIR, "");
+ String [] list = StringUtils.split(dirs);
+ Path[] result = new Path[list.length];
+ for (int i = 0; i < list.length; i++) {
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
+ }
+ return result;
+ }
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * HIDDEN_FILE_FILTER together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ /** List of filters. */
+ private List<PathFilter> filters;
+
+ /**
+ * Constructor.
+ *
+ * @param filters The list of filters
+ */
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ /**
+ * True iff all filters accept the given path.
+ *
+ * @param path The path to check
+ * @return Whether the path is accepted
+ */
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Common method for listing vertex/edge input directories.
+ *
+ * @param job The job
+ * @param dirs list of vertex/edge input paths
+ * @return Array of FileStatus objects
+ * @throws IOException
+ */
+ private List<FileStatus> listStatus(JobContext job, Path[] dirs)
+ throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+/*if[HADOOP_NON_SECURE]
+else[HADOOP_NON_SECURE]
+ // get tokens for all the required FileSystems..
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
+ job.getConfiguration());
+end[HADOOP_NON_SECURE]*/
+
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the HIDDEN_FILE_FILTER and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(HIDDEN_FILE_FILTER);
+ PathFilter jobFilter = getInputPathFilter(job);
+ if (jobFilter != null) {
+ filters.add(jobFilter);
+ }
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ for (Path p : dirs) {
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ Collections.addAll(result, fs.listStatus(globStat.getPath()));
+ } else {
+ result.add(globStat);
+ }
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ /**
+ * List vertex input directories.
+ *
+ * @param job the job to list vertex input paths for
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listVertexStatus(JobContext job)
+ throws IOException {
+ return listStatus(job, getVertexInputPaths(job));
+ }
+
+ /**
+ * List edge input directories.
+ *
+ * @param job the job to list edge input paths for
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listEdgeStatus(JobContext job)
+ throws IOException {
+ return listStatus(job, getEdgeInputPaths(job));
+ }
+
+ /**
+ * Common method for generating the list of vertex/edge input splits.
+ *
+ * @param job The job
+ * @param files Array of FileStatus objects for vertex/edge input files
+ * @return The list of vertex/edge input splits
+ * @throws IOException
+ */
+ private List<InputSplit> getSplits(JobContext job, List<FileStatus> files)
+ throws IOException {
+ long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+ long maxSize = getMaxSplitSize(job);
+
+ // generate splits
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ for (FileStatus file: files) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ long length = file.getLen();
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ if ((length != 0) && isSplitable(job, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ splits.add(new FileSplit(path, length - bytesRemaining,
+ bytesRemaining,
+ blkLocations[blkLocations.length - 1].getHosts()));
+ }
+ } else if (length != 0) {
+ splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ } else {
+ //Create empty hosts array for zero length files
+ splits.add(new FileSplit(path, 0, length, new String[0]));
+ }
+ }
+ return splits;
+ }
+
+ /**
+ * Generate the list of vertex input splits.
+ *
+ * @param job The job
+ * @return The list of vertex input splits
+ * @throws IOException
+ */
+ public List<InputSplit> getVertexSplits(JobContext job) throws IOException {
+ List<FileStatus> files = listVertexStatus(job);
+ List<InputSplit> splits = getSplits(job, files);
+ // Save the number of input files in the job-conf
+ job.getConfiguration().setLong(NUM_VERTEX_INPUT_FILES, files.size());
+ LOG.debug("Total # of vertex splits: " + splits.size());
+ return splits;
+ }
+
+ /**
+ * Generate the list of edge input splits.
+ *
+ * @param job The job
+ * @return The list of edge input splits
+ * @throws IOException
+ */
+ public List<InputSplit> getEdgeSplits(JobContext job) throws IOException {
+ List<FileStatus> files = listEdgeStatus(job);
+ List<InputSplit> splits = getSplits(job, files);
+ // Save the number of input files in the job-conf
+ job.getConfiguration().setLong(NUM_EDGE_INPUT_FILES, files.size());
+ LOG.debug("Total # of edge splits: " + splits.size());
+ return splits;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/GiraphTextInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+/**
+ * Provides functionality similar to {@link org.apache.hadoop
+ * .mapreduce.lib.input.TextInputFormat}, but allows for different data
+ * sources (vertex and edge data).
+ */
+public class GiraphTextInputFormat
+ extends GiraphFileInputFormat<LongWritable, Text> {
+ @Override
+ public RecordReader<LongWritable, Text>
+ createRecordReader(InputSplit split, TaskAttemptContext context) {
+ return new LineRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+ return codec == null;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntIntTextVertexValueInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexValueInputFormat}
+ * for integer ids and values.
+ *
+ * Each line consists of: id, value
+ *
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class IntIntTextVertexValueInputFormat<E extends Writable,
+ M extends Writable> extends
+ TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+ /** Separator for id and value */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ @Override
+ public TextVertexValueReader createVertexValueReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new IntIntTextVertexValueReader();
+ }
+
+ /**
+ * {@link org.apache.giraph.graph.VertexValueReader} associated with
+ * {@link IntIntTextVertexValueInputFormat}.
+ */
+ public class IntIntTextVertexValueReader extends
+ TextVertexValueReaderFromEachLineProcessed<IntPair> {
+
+ @Override
+ protected IntPair preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ return new IntPair(Integer.valueOf(tokens[0]),
+ Integer.valueOf(tokens[1]));
+ }
+
+ @Override
+ protected IntWritable getId(IntPair data) throws IOException {
+ return new IntWritable(data.getFirst());
+ }
+
+ @Override
+ protected IntWritable getValue(IntPair data) throws IOException {
+ return new IntWritable(data.getSecond());
+ }
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java?rev=1405175&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullTextEdgeInputFormat.java Fri Nov 2 21:37:30 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.utils.IntPair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.EdgeInputFormat} for
+ * unweighted graphs with int ids.
+ *
+ * Each line consists of: source_vertex, target_vertex
+ */
+public class IntNullTextEdgeInputFormat extends
+ TextEdgeInputFormat<IntWritable, NullWritable> {
+ /** Splitter for endpoints */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ @Override
+ public TextEdgeReader createEdgeReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new IntNullTextEdgeReader();
+ }
+
+ /**
+ * {@link org.apache.giraph.graph.EdgeReader} associated with
+ * {@link IntNullTextEdgeInputFormat}.
+ */
+ public class IntNullTextEdgeReader extends
+ TextEdgeReaderFromEachLineProcessed<IntPair> {
+ @Override
+ protected IntPair preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ return new IntPair(Integer.valueOf(tokens[0]),
+ Integer.valueOf(tokens[1]));
+ }
+
+ @Override
+ protected IntWritable getSourceVertexId(IntPair endpoints)
+ throws IOException {
+ return new IntWritable(endpoints.getFirst());
+ }
+
+ @Override
+ protected IntWritable getTargetVertexId(IntPair endpoints)
+ throws IOException {
+ return new IntWritable(endpoints.getSecond());
+ }
+
+ @Override
+ protected NullWritable getValue(IntPair endpoints) throws IOException {
+ return NullWritable.get();
+ }
+ }
+}