You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:05 UTC
[09/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 82e1b1e..99b28df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -43,7 +43,6 @@ import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.InputSplitsCallable;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.zk.ZooKeeperManager;
@@ -72,9 +71,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
/**
* The Giraph-specific business logic for a single BSP
@@ -86,11 +86,10 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class GraphTaskManager<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> implements
+ E extends Writable> implements
ResetSuperstepMetricsObserver {
/*if_not[PURE_YARN]
static { // Eliminate this? Even MRv1 tasks should not need it here.
@@ -112,9 +111,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/** Class logger */
private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
/** Coordination service worker */
- private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ private CentralizedServiceWorker<I, V, E> serviceWorker;
/** Coordination service master */
- private CentralizedServiceMaster<I, V, E, M> serviceMaster;
+ private CentralizedServiceMaster<I, V, E> serviceMaster;
/** Coordination service master thread */
private Thread masterThread = null;
/** The worker should be run exactly once, or else there is a problem. */
@@ -122,7 +121,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/** Manages the ZooKeeper servers if necessary (dynamic startup) */
private ZooKeeperManager zkManager;
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Already complete? */
private boolean done = false;
/** What kind of functions is this mapper doing? */
@@ -176,7 +175,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
public void setup(Path[] zkPathList)
throws IOException, InterruptedException {
context.setStatus("setup: Beginning worker setup.");
- conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
context.getConfiguration());
determineClassTypes(conf);
// configure global logging level for Giraph job
@@ -233,8 +232,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (collectInputSuperstepStats(finishedSuperstepStats)) {
return;
}
- WorkerAggregatorUsage aggregatorUsage =
- prepareAggregatorsAndGraphState();
+ prepareGraphStateAndWorkerContext();
List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
int numComputeThreads = conf.getNumComputeThreads();
@@ -243,24 +241,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
final long superstep = serviceWorker.getSuperstep();
GiraphTimerContext superstepTimerContext =
getTimerForThisSuperstep(superstep);
- GraphState<I, V, E, M> graphState =
- new GraphState<I, V, E, M>(superstep,
- finishedSuperstepStats.getVertexCount(),
- finishedSuperstepStats.getEdgeCount(),
- context, this, null, aggregatorUsage);
+ GraphState graphState = new GraphState(superstep,
+ finishedSuperstepStats.getVertexCount(),
+ finishedSuperstepStats.getEdgeCount(),
+ context);
Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
- serviceWorker.startSuperstep(graphState);
+ serviceWorker.startSuperstep();
if (LOG.isDebugEnabled()) {
LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
}
context.progress();
serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
context.progress();
- graphState = checkSuperstepRestarted(
- aggregatorUsage, superstep, graphState);
+ graphState = checkSuperstepRestarted(superstep, graphState);
prepareForSuperstep(graphState);
context.progress();
- MessageStoreByPartition<I, M> messageStore =
+ MessageStoreByPartition<I, Writable> messageStore =
serviceWorker.getServerData().getCurrentMessageStore();
int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
int numThreads = Math.min(numComputeThreads, numPartitions);
@@ -276,14 +272,14 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
messageStore, numPartitions, numThreads);
}
finishedSuperstepStats = completeSuperstepAndCollectStats(
- partitionStatsList, superstepTimerContext, graphState);
+ partitionStatsList, superstepTimerContext);
// END of superstep compute loop
} while (!finishedSuperstepStats.allVerticesHalted());
if (LOG.isInfoEnabled()) {
LOG.info("execute: BSP application done (global vertices marked done)");
}
- updateSuperstepGraphState(aggregatorUsage);
+ updateSuperstepGraphState();
postApplication();
}
@@ -364,15 +360,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Utility to place a new, updated GraphState object into the serviceWorker.
- * @param aggregatorUsage handle to aggregation metadata
*/
- private void updateSuperstepGraphState(
- WorkerAggregatorUsage aggregatorUsage) {
+ private void updateSuperstepGraphState() {
serviceWorker.getWorkerContext().setGraphState(
- new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
- finishedSuperstepStats.getVertexCount(),
- finishedSuperstepStats.getEdgeCount(), context, this, null,
- aggregatorUsage));
+ new GraphState(serviceWorker.getSuperstep(),
+ finishedSuperstepStats.getVertexCount(),
+ finishedSuperstepStats.getEdgeCount(), context));
}
/**
@@ -380,15 +373,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* end of each superstep processing loop in the <code>execute</code> method.
* @param partitionStatsList list of stas for each superstep to append to
* @param superstepTimerContext for job metrics
- * @param graphState the graph state metadata
* @return the collected stats at the close of the current superstep.
*/
private FinishedSuperstepStats completeSuperstepAndCollectStats(
List<PartitionStats> partitionStatsList,
- GiraphTimerContext superstepTimerContext,
- GraphState<I, V, E, M> graphState) {
- finishedSuperstepStats =
- serviceWorker.finishSuperstep(graphState, partitionStatsList);
+ GiraphTimerContext superstepTimerContext) {
+ finishedSuperstepStats = serviceWorker.finishSuperstep(partitionStatsList);
superstepTimerContext.stop();
if (conf.metricsEnabled()) {
GiraphMetrics.get().perSuperstep().printSummary(System.err);
@@ -401,7 +391,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* operations for the next superstep.
* @param graphState graph state metadata object
*/
- private void prepareForSuperstep(GraphState<I, V, E, M> graphState) {
+ private void prepareForSuperstep(GraphState graphState) {
serviceWorker.prepareSuperstep();
serviceWorker.getWorkerContext().setGraphState(graphState);
@@ -417,15 +407,11 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
/**
- * Prepare aggregators and worker context for superstep cycles.
- * @return aggregator metadata object
+ * Prepare graph state and worker context for superstep cycles.
*/
- private WorkerAggregatorUsage prepareAggregatorsAndGraphState() {
- WorkerAggregatorUsage aggregatorUsage =
- serviceWorker.getAggregatorHandler();
- updateSuperstepGraphState(aggregatorUsage);
+ private void prepareGraphStateAndWorkerContext() {
+ updateSuperstepGraphState();
workerContextPreApp();
- return aggregatorUsage;
}
/**
@@ -459,18 +445,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
public void determineClassTypes(Configuration conf) {
ImmutableClassesGiraphConfiguration giraphConf =
new ImmutableClassesGiraphConfiguration(conf);
- Class<? extends Vertex<I, V, E, M>> vertexClass =
- giraphConf.getVertexClass();
- List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
- Vertex.class, vertexClass);
+ Class<? extends Computation<I, V, E, Writable, Writable>> computationClass =
+ giraphConf.getComputationClass();
+ List<Class<?>> classList = ReflectionUtils.<Computation>getTypeArguments(
+ Computation.class, computationClass);
Type vertexIndexType = classList.get(0);
Type vertexValueType = classList.get(1);
Type edgeValueType = classList.get(2);
- Type messageValueType = classList.get(3);
+ Type incomingMessageValueType = classList.get(3);
+ Type outgoingMessageValueType = classList.get(4);
VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
- MESSAGE_VALUE_CLASS.set(conf, (Class<Writable>) messageValueType);
+ INCOMING_MESSAGE_VALUE_CLASS.set(conf,
+ (Class<Writable>) incomingMessageValueType);
+ OUTGOING_MESSAGE_VALUE_CLASS.set(conf,
+ (Class<Writable>) outgoingMessageValueType);
}
/**
@@ -567,16 +557,16 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
LOG.info("setup: Starting up BspServiceMaster " +
"(master thread)...");
}
- serviceMaster = new BspServiceMaster<I, V, E, M>(
+ serviceMaster = new BspServiceMaster<I, V, E>(
serverPortList, sessionMsecTimeout, context, this);
- masterThread = new MasterThread<I, V, E, M>(serviceMaster, context);
+ masterThread = new MasterThread<I, V, E>(serviceMaster, context);
masterThread.start();
}
if (graphFunctions.isWorker()) {
if (LOG.isInfoEnabled()) {
LOG.info("setup: Starting up BspServiceWorker...");
}
- serviceWorker = new BspServiceWorker<I, V, E, M>(
+ serviceWorker = new BspServiceWorker<I, V, E>(
serverPortList,
sessionMsecTimeout,
context,
@@ -729,8 +719,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
*/
private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
List<PartitionStats> partitionStatsList,
- final GraphState<I, V, E, M> graphState,
- final MessageStoreByPartition<I, M> messageStore,
+ final GraphState graphState,
+ final MessageStoreByPartition<I, Writable> messageStore,
int numPartitions,
int numThreads) {
final BlockingQueue<Integer> computePartitionIdQueue =
@@ -748,7 +738,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
@Override
public Callable<Collection<PartitionStats>> newCallable(
int callableId) {
- return new ComputeCallable<I, V, E, M>(
+ return new ComputeCallable<I, V, E, Writable, Writable>(
context,
graphState,
messageStore,
@@ -769,14 +759,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Handle the event that this superstep is a restart of a failed one.
- * @param aggregatorUsage aggregator metadata
* @param superstep current superstep
* @param graphState the BSP graph state
* @return the graph state, updated if this is a restart superstep
*/
- private GraphState<I, V, E, M> checkSuperstepRestarted(
- WorkerAggregatorUsage aggregatorUsage, long superstep,
- GraphState<I, V, E, M> graphState) throws IOException {
+ private GraphState checkSuperstepRestarted(long superstep,
+ GraphState graphState) throws IOException {
// Might need to restart from another superstep
// (manually or automatic), or store a checkpoint
if (serviceWorker.getRestartedSuperstep() == superstep) {
@@ -788,10 +776,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
finishedSuperstepStats = new FinishedSuperstepStats(0, false,
vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
false);
- graphState = new GraphState<I, V, E, M>(superstep,
+ graphState = new GraphState(superstep,
finishedSuperstepStats.getVertexCount(),
finishedSuperstepStats.getEdgeCount(),
- context, this, null, aggregatorUsage);
+ context);
} else if (serviceWorker.checkpointFrequencyMet(superstep)) {
serviceWorker.storeCheckpoint();
}
@@ -927,4 +915,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
"original expection will be rethrown", e1);
}
}
+
+ public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index 61624e5..82fbe0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -28,30 +28,21 @@ import org.apache.giraph.edge.MutableEdgesWrapper;
import org.apache.giraph.edge.MutableOutEdges;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.StrictRandomAccessOutEdges;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
-import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import java.io.IOException;
import java.util.Iterator;
/**
- * Basic abstract class for writing a BSP application for computation.
- * Giraph will store Vertex value and edges, hence all user data should
- * be stored as part of the vertex value.
+ * Class which holds vertex id, data and edges.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
-public abstract class Vertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements WorkerAggregatorUsage {
+public class Vertex<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/** Vertex id. */
private I id;
/** Vertex value. */
@@ -60,8 +51,6 @@ public abstract class Vertex<I extends WritableComparable,
private OutEdges<I, E> edges;
/** If true, do not do anymore computation on this vertex. */
private boolean halt;
- /** Global graph state **/
- private GraphState<I, V, E, M> graphState;
/**
* Initialize id, value, and edges.
@@ -109,25 +98,6 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
- * Must be defined by user to do computation on a single Vertex.
- *
- * @param messages Messages that were sent to this vertex in the previous
- * superstep. Each message is only guaranteed to have
- * a life expectancy as long as next() is not called.
- * @throws IOException
- */
- public abstract void compute(Iterable<M> messages) throws IOException;
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return graphState.getSuperstep();
- }
-
- /**
* Get the vertex id.
*
* @return My vertex id.
@@ -155,26 +125,6 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getTotalNumVertices() {
- return graphState.getTotalNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getTotalNumEdges() {
- return graphState.getTotalNumEdges();
- }
-
- /**
* Get a read-only view of the out-edges of this vertex.
* Note: edge objects returned by this iterable may be invalidated as soon
* as the next element is requested. Thus, keeping a reference to an edge
@@ -329,32 +279,6 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
- * Send a message to a vertex id. The message should not be mutated after
- * this method returns or else undefined results could occur.
- *
- * @param id Vertex id to send the message to
- * @param message Message data to send. Note that after the message is sent,
- * the user should not modify the object.
- */
- public void sendMessage(I id, M message) {
- if (graphState.getWorkerClientRequestProcessor().
- sendMessageRequest(id, message)) {
- graphState.getGraphTaskManager().notifySentMessages();
- }
- }
-
- /**
- * Send a message to all edges.
- *
- * @param message Message sent to all edges.
- */
- public void sendMessageToAllEdges(M message) {
- for (Edge<I, E> edge : getEdges()) {
- sendMessage(edge.getTargetVertexId(), message);
- }
- }
-
- /**
* After this is called, the compute() code will no longer be called for
* this vertex unless a message is sent to it. Then the compute() code
* will be called once again until this function is called. The
@@ -398,115 +322,6 @@ public abstract class Vertex<I extends WritableComparable,
edges.remove(targetVertexId);
}
- /**
- * Sends a request to create a vertex that will be available during the
- * next superstep.
- *
- * @param id Vertex id
- * @param value Vertex value
- * @param edges Initial edges
- */
- public void addVertexRequest(I id, V value, OutEdges<I, E> edges)
- throws IOException {
- Vertex<I, V, E, M> vertex = getConf().createVertex();
- vertex.initialize(id, value, edges);
- graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex);
- }
-
- /**
- * Sends a request to create a vertex that will be available during the
- * next superstep.
- *
- * @param id Vertex id
- * @param value Vertex value
- */
- public void addVertexRequest(I id, V value) throws IOException {
- addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
- }
-
- /**
- * Request to remove a vertex from the graph
- * (applied just prior to the next superstep).
- *
- * @param vertexId Id of the vertex to be removed.
- */
- public void removeVertexRequest(I vertexId) throws IOException {
- graphState.getWorkerClientRequestProcessor().
- removeVertexRequest(vertexId);
- }
-
- /**
- * Request to add an edge of a vertex in the graph
- * (processed just prior to the next superstep)
- *
- * @param sourceVertexId Source vertex id of edge
- * @param edge Edge to add
- */
- public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
- throws IOException {
- graphState.getWorkerClientRequestProcessor().
- addEdgeRequest(sourceVertexId, edge);
- }
-
- /**
- * Request to remove all edges from a given source vertex to a given target
- * vertex (processed just prior to the next superstep).
- *
- * @param sourceVertexId Source vertex id
- * @param targetVertexId Target vertex id
- */
- public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
- throws IOException {
- graphState.getWorkerClientRequestProcessor().
- removeEdgesRequest(sourceVertexId, targetVertexId);
- }
-
- /**
- * Set the graph state for all workers
- *
- * @param graphState Graph state for all workers
- */
- public void setGraphState(GraphState<I, V, E, M> graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return graphState.getContext();
- }
-
- /**
- * Get the partition context
- *
- * @return Partition context
- */
- public PartitionContext getPartitionContext() {
- return graphState.getPartitionContext();
- }
-
- /**
- * Get the worker context
- *
- * @return WorkerContext context
- */
- public WorkerContext getWorkerContext() {
- return graphState.getGraphTaskManager().getWorkerContext();
- }
-
- @Override
- public <A extends Writable> void aggregate(String name, A value) {
- graphState.getWorkerAggregatorUsage().aggregate(name, value);
- }
-
- @Override
- public <A extends Writable> A getAggregatedValue(String name) {
- return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
- }
-
@Override
public String toString() {
return "Vertex(id=" + getId() + ",value=" + getValue() +
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index 9474636..3d09c06 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -31,18 +31,17 @@ import java.util.List;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
*/
@SuppressWarnings("rawtypes")
public interface VertexChanges<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
* Get the added vertices for this particular vertex index from the previous
* superstep.
*
* @return List of vertices for this vertex index.
*/
- List<Vertex<I, V, E, M>> getAddedVertexList();
+ List<Vertex<I, V, E>> getAddedVertexList();
/**
* Get the number of times this vertex was removed in the previous
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index 75c0aef..6f54dc7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -41,15 +41,13 @@ import java.util.List;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
*/
@SuppressWarnings("rawtypes")
public class VertexMutations<I extends WritableComparable,
- V extends Writable, E extends Writable,
- M extends Writable> implements VertexChanges<I, V, E, M>,
+ V extends Writable, E extends Writable> implements VertexChanges<I, V, E>,
Writable, ImmutableClassesGiraphConfigurable {
/** List of added vertices during the last superstep */
- private final List<Vertex<I, V, E, M>> addedVertexList = Lists.newArrayList();
+ private final List<Vertex<I, V, E>> addedVertexList = Lists.newArrayList();
/** Count of remove vertex requests */
private int removedVertexCount = 0;
/** List of added edges */
@@ -57,15 +55,15 @@ public class VertexMutations<I extends WritableComparable,
/** List of removed edges */
private final List<I> removedEdgeList = Lists.newArrayList();
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E> conf;
/**
* Copy the vertex mutations.
*
* @return Copied vertex mutations
*/
- public VertexMutations<I, V, E, M> copy() {
- VertexMutations<I, V, E, M> copied = new VertexMutations<I, V, E, M>();
+ public VertexMutations<I, V, E> copy() {
+ VertexMutations<I, V, E> copied = new VertexMutations<I, V, E>();
copied.addedVertexList.addAll(this.addedVertexList);
copied.removedVertexCount = this.removedVertexCount;
copied.addedEdgeList.addAll(this.addedEdgeList);
@@ -75,7 +73,7 @@ public class VertexMutations<I extends WritableComparable,
}
@Override
- public List<Vertex<I, V, E, M>> getAddedVertexList() {
+ public List<Vertex<I, V, E>> getAddedVertexList() {
return addedVertexList;
}
@@ -87,7 +85,7 @@ public class VertexMutations<I extends WritableComparable,
int addedVertexListSize = input.readInt();
for (int i = 0; i < addedVertexListSize; ++i) {
- Vertex<I, V, E, M> vertex =
+ Vertex<I, V, E> vertex =
WritableUtils.readVertexFromDataInput(input, getConf());
addedVertexList.add(vertex);
}
@@ -109,7 +107,7 @@ public class VertexMutations<I extends WritableComparable,
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(addedVertexList.size());
- for (Vertex<I, V, E, M> vertex : addedVertexList) {
+ for (Vertex<I, V, E> vertex : addedVertexList) {
WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
}
output.writeInt(removedVertexCount);
@@ -129,7 +127,7 @@ public class VertexMutations<I extends WritableComparable,
*
* @param vertex Vertex to be added
*/
- public void addVertex(Vertex<I, V, E, M> vertex) {
+ public void addVertex(Vertex<I, V, E> vertex) {
addedVertexList.add(vertex);
}
@@ -178,7 +176,7 @@ public class VertexMutations<I extends WritableComparable,
*
* @param vertexMutations Object to be added
*/
- public void addVertexMutations(VertexMutations<I, V, E, M> vertexMutations) {
+ public void addVertexMutations(VertexMutations<I, V, E> vertexMutations) {
addedVertexList.addAll(vertexMutations.getAddedVertexList());
removedVertexCount += vertexMutations.getRemovedVertexCount();
addedEdgeList.addAll(vertexMutations.getAddedEdgeList());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 1fc0ddc..b6659f4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -28,12 +28,10 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface VertexResolver<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends GraphStateAware<I, V, E, M> {
+ V extends Writable, E extends Writable> {
/**
* A vertex may have been removed, created zero or more times and had
* zero or more messages sent to it. This method will handle all situations
@@ -47,15 +45,8 @@ public interface VertexResolver<I extends WritableComparable,
* @return Vertex to be returned, if null, and a vertex currently exists
* it will be removed
*/
- Vertex<I, V, E, M> resolve(I vertexId,
- Vertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges,
+ Vertex<I, V, E> resolve(I vertexId,
+ Vertex<I, V, E> vertex,
+ VertexChanges<I, V, E> vertexChanges,
boolean hasMessages);
-
- /**
- * Set the graph state.
- *
- * @param graphState Graph state saved.
- */
- void setGraphState(GraphState<I, V, E, M> graphState);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
index e62bb01..eb9197c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
@@ -36,7 +36,7 @@ public interface VertexValueFactory<V extends Writable> {
* @param configuration Configuration
*/
void initialize(
- ImmutableClassesGiraphConfiguration<?, V, ?, ?> configuration);
+ ImmutableClassesGiraphConfiguration<?, V, ?> configuration);
/**
* Create a new vertex value.
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
index c03d718..ebc62f6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
@@ -42,12 +42,11 @@ import org.apache.log4j.Logger;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class SuperstepHashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashPartitionerFactory<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends HashPartitionerFactory<I, V, E> {
/**
* Changes the {@link HashMasterPartitioner} to make ownership of the
* partitions based on a superstep. For testing only as it is totally
@@ -56,11 +55,10 @@ public class SuperstepHashPartitionerFactory<I extends WritableComparable,
* @param <I> vertex id
* @param <V> vertex data
* @param <E> edge data
- * @param <M> message data
*/
private static class SuperstepMasterPartition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashMasterPartitioner<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends HashMasterPartitioner<I, V, E> {
/** Class logger */
private static Logger LOG =
Logger.getLogger(SuperstepMasterPartition.class);
@@ -120,8 +118,8 @@ public class SuperstepHashPartitionerFactory<I extends WritableComparable,
}
@Override
- public MasterGraphPartitioner<I, V, E, M>
+ public MasterGraphPartitioner<I, V, E>
createMasterGraphPartitioner() {
- return new SuperstepMasterPartition<I, V, E, M>(getConf());
+ return new SuperstepMasterPartition<I, V, E>(getConf());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 6d87bf2..a1af98f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
@SuppressWarnings("rawtypes")
public abstract class EdgeReader<I extends WritableComparable,
E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
- I, Writable, E, Writable> {
+ I, Writable, E> {
/**
* Use the input split and context to setup reading the edges.
* Guaranteed to be called prior to any other function.
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 58d79a6..86e86d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.ReflectionUtils;
*/
public abstract class GiraphInputFormat<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+ DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
* Get the list of input splits for the format.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
index e4c3496..41a049f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
@@ -40,6 +40,6 @@ public interface SimpleVertexWriter<I extends WritableComparable,
* @throws IOException
* @throws InterruptedException
*/
- void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+ void writeVertex(Vertex<I, V, E> vertex) throws IOException,
InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
index 154f7e4..ad00a8e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public abstract class VertexOutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+ DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
* Create a vertex writer for a given split. The framework will call
* {@link VertexWriter#initialize(TaskAttemptContext)} before
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 3f6bb3f..9695169 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
@SuppressWarnings("rawtypes")
public abstract class VertexReader<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+ DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
* Use the input split and context to setup reading the vertices.
* Guaranteed to be called prior to any other function.
@@ -68,7 +68,7 @@ public abstract class VertexReader<I extends WritableComparable,
* @throws IOException
* @throws InterruptedException
*/
- public abstract Vertex<I, V, E, ?> getCurrentVertex()
+ public abstract Vertex<I, V, E> getCurrentVertex()
throws IOException, InterruptedException;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
index 0b06a4a..70e721e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
@@ -39,9 +39,9 @@ public abstract class VertexValueReader<I extends WritableComparable,
}
@Override
- public final Vertex<I, V, Writable, ?> getCurrentVertex() throws IOException,
+ public final Vertex<I, V, Writable> getCurrentVertex() throws IOException,
InterruptedException {
- Vertex<I, V, Writable, ?> vertex = getConf().createVertex();
+ Vertex<I, V, Writable> vertex = getConf().createVertex();
vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
return vertex;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index a4285c1..69fdfc5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
@SuppressWarnings("rawtypes")
public abstract class VertexWriter<I extends WritableComparable,
V extends Writable, E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
implements SimpleVertexWriter<I, V, E> {
/**
* Use the context to setup writing the vertices.
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
index ad52496..40ab922 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class DefaultEdgeInputFilter<I extends WritableComparable,
E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E>
implements EdgeInputFilter<I, E> {
@Override
public boolean dropEdge(I sourceId, Edge<I, E> edge) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
index 2976cbc..f35ade5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.io.WritableComparable;
*/
public class DefaultVertexInputFilter<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements VertexInputFilter<I, V, E, M> {
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements VertexInputFilter<I, V, E> {
@Override
- public boolean dropVertex(Vertex<I, V, E, M> vertex) {
+ public boolean dropVertex(Vertex<I, V, E> vertex) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
index d9af103..c7d178e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
@@ -27,16 +27,15 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex ID
* @param <V> Vertex Value
* @param <E> Edge Value
- * @param <M> Message Value
*/
public interface VertexInputFilter<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
* Whether to drop a vertex on input.
*
* @param vertex to check
* @return true if we should drop vertex
*/
- boolean dropVertex(Vertex<I, V, E, M> vertex);
+ boolean dropVertex(Vertex<I, V, E> vertex);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
index f71ef25..8efbfd0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
@@ -69,7 +69,7 @@ public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
}
@Override
- public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ public Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException {
StringBuffer sb = new StringBuffer(vertex.getId().toString());
sb.append(delimiter);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index 6dd7468..bd69586 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -75,7 +75,7 @@ public class IdWithValueTextOutputFormat<I extends WritableComparable,
}
@Override
- protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ protected Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException {
String first;
String second;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
index a7dbef8..1038a32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
@@ -35,11 +35,9 @@ import java.util.regex.Pattern;
* Each line consists of: id, value
*
* @param <E> Edge value
- * @param <M> Message data
*/
-public class IntIntTextVertexValueInputFormat<E extends Writable,
- M extends Writable> extends
- TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+public class IntIntTextVertexValueInputFormat<E extends Writable> extends
+ TextVertexValueInputFormat<IntWritable, IntWritable, E> {
/** Separator for id and value */
private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
index 7d8fcf6..a71cf96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
@@ -60,7 +60,7 @@ public class JsonBase64VertexOutputFormat<I extends WritableComparable,
protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
@Override
- protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ protected Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException {
ByteArrayOutputStream outputStream =
new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
index 7dfd607..112860c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -95,9 +95,8 @@ public class JsonLongDoubleFloatDoubleVertexInputFormat extends
}
@Override
- protected Vertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> handleException(Text line, JSONArray jsonVertex,
- JSONException e) {
+ protected Vertex<LongWritable, DoubleWritable, FloatWritable>
+ handleException(Text line, JSONArray jsonVertex, JSONException e) {
throw new IllegalArgumentException(
"Couldn't get vertex from line " + line, e);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
index d0a3305..13c80aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
@@ -52,8 +52,7 @@ public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
TextVertexWriterToEachLine {
@Override
public Text convertVertexToLine(
- Vertex<LongWritable, DoubleWritable,
- FloatWritable, ?> vertex
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex
) throws IOException {
JSONArray jsonVertex = new JSONArray();
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
index c9390ba..d8abfdb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
@@ -116,9 +116,9 @@ public class PseudoRandomIntNullVertexInputFormat extends
}
@Override
- public Vertex<IntWritable, FloatWritable, NullWritable, ?>
+ public Vertex<IntWritable, FloatWritable, NullWritable>
getCurrentVertex() throws IOException, InterruptedException {
- Vertex<IntWritable, FloatWritable, NullWritable, ?> vertex =
+ Vertex<IntWritable, FloatWritable, NullWritable> vertex =
getConf().createVertex();
int vertexId = startingVertexId + verticesRead;
OutEdges<IntWritable, NullWritable> edges =
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 5d293eb..91a19e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -133,9 +133,9 @@ public class PseudoRandomVertexInputFormat extends
}
@Override
- public Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+ public Vertex<LongWritable, DoubleWritable, DoubleWritable>
getCurrentVertex() throws IOException, InterruptedException {
- Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+ Vertex<LongWritable, DoubleWritable, DoubleWritable>
vertex = getConf().createVertex();
long vertexId = startingVertexId + verticesRead;
// Seed on the vertex id to keep the vertex data the same when
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 0f2d929..1d31f4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
*/
@SuppressWarnings("rawtypes")
public class SequenceFileVertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, X extends Vertex<I, V, E, ?>>
+ V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
extends VertexInputFormat<I, V, E> {
/** Internal input format */
protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
@@ -68,7 +68,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
* @param <X> Value type
*/
public static class SequenceFileVertexReader<I extends WritableComparable,
- V extends Writable, E extends Writable, X extends Vertex<I, V, E, ?>>
+ V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
extends VertexReader<I, V, E> {
/** Internal record reader from {@link SequenceFileInputFormat} */
private final RecordReader<I, X> recordReader;
@@ -92,7 +92,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
return recordReader.nextKeyValue();
}
- @Override public Vertex<I, V, E, ?> getCurrentVertex()
+ @Override public Vertex<I, V, E> getCurrentVertex()
throws IOException, InterruptedException {
return recordReader.getCurrentValue();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
index c4ed65c..c6ff6d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -105,7 +105,7 @@ public abstract class SequenceFileVertexOutputFormat<
}
@Override
- public final void writeVertex(Vertex<I, V, E, ?> vertex) throws
+ public final void writeVertex(Vertex<I, V, E> vertex) throws
IOException, InterruptedException {
// Convert vertex id to type OK.
OK outKey = convertToSequenceFileKey(vertex.getId());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index 6e62b71..17174a3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -21,7 +21,6 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -29,10 +28,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
* Class to read graphs stored as adjacency lists with ids represented by
* Strings and values as doubles. This is a good inputformat for reading
* graphs where the id types do not matter and can be stashed in a String.
- *
- * @param <M> Message type.
*/
-public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+public class TextDoubleDoubleAdjacencyListVertexInputFormat
extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
DoubleWritable> {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index b08e6f7..debdccc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -154,10 +154,10 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
TextVertexReader {
@Override
- public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ public final Vertex<I, V, E> getCurrentVertex() throws IOException,
InterruptedException {
Text line = getRecordReader().getCurrentValue();
- Vertex<I, V, E, ?> vertex = getConf().createVertex();
+ Vertex<I, V, E> vertex = getConf().createVertex();
vertex.initialize(getId(line), getValue(line), getEdges(line));
return vertex;
}
@@ -222,10 +222,10 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
}
@Override
- public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ public final Vertex<I, V, E> getCurrentVertex() throws IOException,
InterruptedException {
Text line = getRecordReader().getCurrentValue();
- Vertex<I, V, E, ?> vertex;
+ Vertex<I, V, E> vertex;
T processed = preprocessLine(line);
vertex = getConf().createVertex();
vertex.initialize(getId(processed), getValue(processed),
@@ -306,11 +306,11 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
@SuppressWarnings("unchecked")
@Override
- public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ public final Vertex<I, V, E> getCurrentVertex() throws IOException,
InterruptedException {
// Note we are reading from value only since key is the line number
Text line = getRecordReader().getCurrentValue();
- Vertex<I, V, E, ?> vertex;
+ Vertex<I, V, E> vertex;
T processed = null;
try {
processed = preprocessLine(line);
@@ -401,7 +401,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
* the exception thrown while reading the line
* @return the recovered/alternative vertex to be used
*/
- protected Vertex<I, V, E, ?> handleException(Text line, T processed, X e) {
+ protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
throw new IllegalArgumentException(e);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index a3073f9..c91d543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -158,7 +158,7 @@ public abstract class TextVertexOutputFormat<I extends WritableComparable,
* @throws IOException
* exception that can be thrown while writing
*/
- protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index 6d133ae..e960444 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -39,11 +39,10 @@ import java.util.List;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message value
*/
@SuppressWarnings("rawtypes")
public abstract class TextVertexValueInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
extends VertexValueInputFormat<I, V> {
/** Uses the GiraphTextInputFormat to do everything */
protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
index 569cee9..4b48e63 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
@@ -78,11 +78,11 @@ public class EdgeInputFormatDescription<I extends WritableComparable,
* @param conf Configuration which we want to create a copy from
* @return Copy of configuration
*/
- private ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>
+ private ImmutableClassesGiraphConfiguration<I, Writable, E>
createConfigurationCopy(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
- new ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>(conf);
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
+ new ImmutableClassesGiraphConfiguration<I, Writable, E>(conf);
confCopy.setEdgeInputFormatClass(getInputFormatClass());
putParametersToConfiguration(confCopy);
return confCopy;
@@ -130,13 +130,13 @@ public class EdgeInputFormatDescription<I extends WritableComparable,
*/
public static <I extends WritableComparable,
E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
List<EdgeInputFormatDescription<I, E>> descriptions =
getEdgeInputFormatDescriptions(conf);
List<EdgeInputFormat<I, E>> edgeInputFormats =
Lists.newArrayListWithCapacity(descriptions.size());
for (EdgeInputFormatDescription<I, E> description : descriptions) {
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+ ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
description.createConfigurationCopy(conf);
edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index c377fbc..fa8839b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -48,7 +48,7 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
super.setConf(conf);
edgeInputFormats =
EdgeInputFormatDescription.createEdgeInputFormats(getConf());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index 72929d9..e851e38 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -49,7 +49,7 @@ public class MultiVertexInputFormat<I extends WritableComparable,
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
super.setConf(conf);
vertexInputFormats =
VertexInputFormatDescription.createVertexInputFormats(getConf());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
index bdd5a74..1487749 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
@@ -80,11 +80,11 @@ public class VertexInputFormatDescription<I extends WritableComparable,
* @param conf Configuration which we want to create a copy from
* @return Copy of configuration
*/
- private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+ private ImmutableClassesGiraphConfiguration<I, V, E>
createConfigurationCopy(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
- new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(conf);
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
+ new ImmutableClassesGiraphConfiguration<I, V, E>(conf);
confCopy.setVertexInputFormatClass(getInputFormatClass());
putParametersToConfiguration(confCopy);
return confCopy;
@@ -136,13 +136,13 @@ public class VertexInputFormatDescription<I extends WritableComparable,
public static <I extends WritableComparable, V extends Writable,
E extends Writable>
List<VertexInputFormat<I, V, E>> createVertexInputFormats(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
List<VertexInputFormatDescription<I, V, E>> descriptions =
getVertexInputFormatDescriptions(conf);
List<VertexInputFormat<I, V, E>> vertexInputFormats =
Lists.newArrayListWithCapacity(descriptions.size());
for (VertexInputFormatDescription<I, V, E> description : descriptions) {
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+ ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
description.createConfigurationCopy(conf);
vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index c0a2cd1..aae7a72 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -50,7 +50,7 @@ public class WrappedEdgeReader<I extends WritableComparable,
* @param conf Configuration
*/
public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
this.baseEdgeReader = baseEdgeReader;
super.setConf(conf);
baseEdgeReader.setConf(conf);
@@ -58,7 +58,7 @@ public class WrappedEdgeReader<I extends WritableComparable,
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
// We don't want to use external configuration
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
index 8110209..bffa330 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
@@ -70,7 +70,7 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
return new VertexWriter<I, V, E>() {
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
super.setConf(conf);
vertexWriter.setConf(conf);
}
@@ -91,7 +91,7 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
@Override
public void writeVertex(
- Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+ Vertex<I, V, E> vertex) throws IOException, InterruptedException {
vertexWriter.writeVertex(vertex);
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 3a8ac50..54adfec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -51,7 +51,7 @@ public class WrappedVertexReader<I extends WritableComparable,
* @param conf Configuration
*/
public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
this.baseVertexReader = baseVertexReader;
super.setConf(conf);
baseVertexReader.setConf(conf);
@@ -59,7 +59,7 @@ public class WrappedVertexReader<I extends WritableComparable,
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
// We don't want to use external configuration
}
@@ -76,7 +76,7 @@ public class WrappedVertexReader<I extends WritableComparable,
}
@Override
- public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ public Vertex<I, V, E> getCurrentVertex() throws IOException,
InterruptedException {
return baseVertexReader.getCurrentVertex();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
index efd8fe7..f61ac45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
@@ -53,7 +53,7 @@ public class EdgeReaderWrapper<I extends WritableComparable,
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
super.setConf(conf);
conf.configureIfPossible(edgeReader);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
index 614f945..ca35c51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
@@ -38,23 +38,23 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class VertexReaderWrapper<I extends WritableComparable,
V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
/** Wrapped vertex reader */
- private GiraphReader<Vertex<I, V, E, ?>> vertexReader;
+ private GiraphReader<Vertex<I, V, E>> vertexReader;
/** {@link VertexReader}-like wrapper of {@link #vertexReader} */
- private IteratorToReaderWrapper<Vertex<I, V, E, ?>> iterator;
+ private IteratorToReaderWrapper<Vertex<I, V, E>> iterator;
/**
* Constructor
*
* @param vertexReader GiraphReader for vertices to wrap
*/
- public VertexReaderWrapper(GiraphReader<Vertex<I, V, E, ?>> vertexReader) {
+ public VertexReaderWrapper(GiraphReader<Vertex<I, V, E>> vertexReader) {
this.vertexReader = vertexReader;
- iterator = new IteratorToReaderWrapper<Vertex<I, V, E, ?>>(vertexReader);
+ iterator = new IteratorToReaderWrapper<Vertex<I, V, E>>(vertexReader);
}
@Override
public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
super.setConf(conf);
conf.configureIfPossible(vertexReader);
}
@@ -65,7 +65,7 @@ public class VertexReaderWrapper<I extends WritableComparable,
}
@Override
- public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ public Vertex<I, V, E> getCurrentVertex() throws IOException,
InterruptedException {
return iterator.getCurrentObject();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index a8dff87..452d93a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -47,7 +47,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
/** Mapper context */
private final Mapper<?, ?, ?, ?>.Context context;
/** Configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, ?> configuration;
+ private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/** Vertex output format, used to get new vertex writers */
private final VertexOutputFormat<I, V, E> vertexOutputFormat;
/**
@@ -65,7 +65,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
* @param context Mapper context
*/
public MultiThreadedSuperstepOutput(
- ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
Mapper<?, ?, ?, ?>.Context context) {
this.configuration = conf;
vertexOutputFormat = conf.createWrappedVertexOutputFormat();
@@ -80,9 +80,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
if (availableVertexWriters.isEmpty()) {
try {
vertexWriter = vertexOutputFormat.createVertexWriter(context);
- vertexWriter.setConf(
- (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
- configuration);
+ vertexWriter.setConf(configuration);
vertexWriter.initialize(context);
} catch (IOException e) {
throw new IllegalStateException("getVertexWriter: " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
index 82684b2..be981cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
@@ -40,7 +40,7 @@ public class NoOpSuperstepOutput<I extends WritableComparable,
public SimpleVertexWriter<I, V, E> getVertexWriter() {
return new SimpleVertexWriter<I, V, E>() {
@Override
- public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+ public void writeVertex(Vertex<I, V, E> vertex) throws IOException,
InterruptedException {
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
index f94bd56..7f233e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -57,14 +57,13 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
*/
@SuppressWarnings("unchecked")
public SynchronizedSuperstepOutput(
- ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
try {
vertexWriter =
conf.createWrappedVertexOutputFormat().createVertexWriter(context);
- vertexWriter.setConf(
- (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) conf);
+ vertexWriter.setConf(conf);
vertexWriter.initialize(context);
} catch (IOException e) {
throw new IllegalStateException("SynchronizedSuperstepOutput: " +
@@ -76,7 +75,7 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
@Override
public synchronized void writeVertex(
- Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+ Vertex<I, V, E> vertex) throws IOException, InterruptedException {
vertexWriter.writeVertex(vertex);
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index cc6b126..de17157 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -19,12 +19,12 @@
package org.apache.giraph.job;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
@@ -51,10 +51,12 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
* @param <I> the Vertex ID type
* @param <V> the Vertex Value type
* @param <E> the Edge Value type
- * @param <M> the Message type
+ * @param <M1> the incoming Message type
+ * @param <M2> the outgoing Message type
*/
public class GiraphConfigurationValidator<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable, M1 extends Writable,
+ M2 extends Writable> {
/**
* Class logger object.
*/
@@ -67,8 +69,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
private static final int VALUE_PARAM_INDEX = 1;
/** E param vertex index in classList */
private static final int EDGE_PARAM_INDEX = 2;
- /** M param vertex index in classList */
- private static final int MSG_PARAM_INDEX = 3;
+ /** M2 param vertex index in classList */
+ private static final int OUTGOING_MSG_PARAM_INDEX = 4;
/** M param vertex combiner index in classList */
private static final int MSG_COMBINER_PARAM_INDEX = 1;
/** E param edge input format index in classList */
@@ -80,12 +82,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** Vertex Index Type */
private Type vertexIndexType;
- /** Vertex Index Type */
+ /** Vertex Value Type */
private Type vertexValueType;
- /** Vertex Index Type */
+ /** Edge Value Type */
private Type edgeValueType;
- /** Vertex Index Type */
- private Type messageValueType;
+ /** Outgoing Message Type */
+ private Type outgoingMessageValueType;
/**
* The Configuration object for use in the validation test.
@@ -110,14 +112,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
*/
public void validateConfiguration() {
checkConfiguration();
- Class<? extends Vertex<I, V, E, M>> vertexClass =
- conf.getVertexClass();
+ Class<? extends Computation<I, V, E, M1, M2>> computationClass =
+ conf.getComputationClass();
List<Class<?>> classList = ReflectionUtils.getTypeArguments(
- Vertex.class, vertexClass);
+ Computation.class, computationClass);
vertexIndexType = classList.get(ID_PARAM_INDEX);
vertexValueType = classList.get(VALUE_PARAM_INDEX);
edgeValueType = classList.get(EDGE_PARAM_INDEX);
- messageValueType = classList.get(MSG_PARAM_INDEX);
+ outgoingMessageValueType = classList.get(OUTGOING_MSG_PARAM_INDEX);
verifyOutEdgesGenericTypes();
verifyVertexInputFormatGenericTypes();
verifyEdgeInputFormatGenericTypes();
@@ -146,9 +148,9 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
- if (conf.getVertexClass() == null) {
+ if (conf.getComputationClass() == null) {
throw new IllegalArgumentException("checkConfiguration: Null " +
- GiraphConstants.VERTEX_CLASS.getKey());
+ GiraphConstants.COMPUTATION_CLASS.getKey());
}
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
@@ -281,7 +283,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** If there is a combiner type, verify its generic params match the job. */
private void verifyVertexCombinerGenericTypes() {
- Class<? extends Combiner<I, M>> vertexCombinerClass =
+ Class<? extends Combiner<I, M2>> vertexCombinerClass =
conf.getCombinerClass();
if (vertexCombinerClass != null) {
List<Class<?>> classList =
@@ -293,10 +295,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
"vertex - " + vertexIndexType +
", vertex combiner - " + classList.get(ID_PARAM_INDEX));
}
- if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
+ if (!outgoingMessageValueType.equals(
+ classList.get(MSG_COMBINER_PARAM_INDEX))) {
throw new IllegalArgumentException(
"checkClassTypes: Message value types don't match, " +
- "vertex - " + messageValueType +
+ "vertex - " + outgoingMessageValueType +
", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
}
}
@@ -360,14 +363,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** If there is a vertex resolver,
* validate the generic parameter types. */
private void verifyVertexResolverGenericTypes() {
- Class<? extends VertexResolver<I, V, E, M>>
+ Class<? extends VertexResolver<I, V, E>>
vrClass = conf.getVertexResolverClass();
if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
return;
}
- Class<? extends DefaultVertexResolver<I, V, E, M>>
+ Class<? extends DefaultVertexResolver<I, V, E>>
dvrClass =
- (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
+ (Class<? extends DefaultVertexResolver<I, V, E>>) vrClass;
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
DefaultVertexResolver.class, dvrClass);
@@ -392,13 +395,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
"vertex - " + edgeValueType +
", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
}
- if (classList.get(MSG_PARAM_INDEX) != null &&
- !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + messageValueType +
- ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
- }
}
}