You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/03 03:37:12 UTC
svn commit: r1393251 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/netty/
src/main/java/org/apache/giraph/graph/
src/main/java/org/apache/giraph/graph/partition/
src/test/java/org/apache/giraph/comm/ ...
Author: aching
Date: Wed Oct 3 01:37:11 2012
New Revision: 1393251
URL: http://svn.apache.org/viewvc?rev=1393251&view=rev
Log:
GIRAPH-355: Partition.readFields crashes
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 3 01:37:11 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+
+ GIRAPH-355: Partition.readFields crashes. (maja via aching)
+
GIRAPH-354: Giraph Formats should use hcatalog-core. (nitayj via
aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Wed Oct 3 01:37:11 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.SimplePartitionStore;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
@@ -70,20 +71,24 @@ public class ServerData<I extends Writab
*
* @param configuration Configuration
* @param messageStoreFactory Factory for message stores
+ * @param context Mapper context
*/
public ServerData(
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
- messageStoreFactory) {
+ messageStoreFactory,
+ Mapper<?, ?, ?, ?>.Context context) {
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
if (configuration.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_GRAPH,
GiraphConfiguration.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
- partitionStore = new DiskBackedPartitionStore<I, V, E, M>(configuration);
+ partitionStore =
+ new DiskBackedPartitionStore<I, V, E, M>(configuration, context);
} else {
- partitionStore = new SimplePartitionStore<I, V, E, M>(configuration);
+ partitionStore =
+ new SimplePartitionStore<I, V, E, M>(configuration, context);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Wed Oct 3 01:37:11 2012
@@ -63,8 +63,7 @@ public class NettyWorkerClientServer<I e
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
CentralizedServiceWorker<I, V, E, M> service) {
server = new NettyWorkerServer<I, V, E, M>(
- configuration,
- service);
+ configuration, service, context);
client = new NettyWorkerClient<I, V, E, M>(context,
configuration, service,
((NettyWorkerServer<I, V, E, M>) server).getServerData());
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Wed Oct 3 01:37:11 2012
@@ -38,6 +38,7 @@ import org.apache.giraph.graph.VertexRes
import org.apache.giraph.graph.partition.Partition;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
@@ -74,9 +75,11 @@ public class NettyWorkerServer<I extends
*
* @param conf Configuration
* @param service Service to get partition mappings
+ * @param context Mapper context
*/
public NettyWorkerServer(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceWorker<I, V, E, M> service) {
+ CentralizedServiceWorker<I, V, E, M> service,
+ Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
this.service = service;
@@ -85,7 +88,7 @@ public class NettyWorkerServer<I extends
GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
if (!useOutOfCoreMessaging) {
serverData = new ServerData<I, V, E, M>(
- conf, SimpleMessageStore.newFactory(service, conf));
+ conf, SimpleMessageStore.newFactory(service, conf), context);
} else {
int maxMessagesInMemory = conf.getInt(
GiraphConfiguration.MAX_MESSAGES_IN_MEMORY,
@@ -98,7 +101,7 @@ public class NettyWorkerServer<I extends
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
storeFactory = DiskBackedMessageStoreByPartition.newFactory(service,
maxMessagesInMemory, partitionStoreFactory);
- serverData = new ServerData<I, V, E, M>(conf, storeFactory);
+ serverData = new ServerData<I, V, E, M>(conf, storeFactory, context);
}
nettyServer = new NettyServer(conf,
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 3 01:37:11 2012
@@ -182,7 +182,8 @@ public class BspServiceWorker<I extends
workerPartitionStore = null;
} else {
workerPartitionStore =
- new SimplePartitionStore<I, V, E, M>(getConfiguration());
+ new SimplePartitionStore<I, V, E, M>(getConfiguration(),
+ getContext());
}
}
@@ -474,7 +475,8 @@ public class BspServiceWorker<I extends
if (partition == null) {
partition = new Partition<I, V, E, M>(
getConfiguration(),
- partitionOwner.getPartitionId());
+ partitionOwner.getPartitionId(),
+ getContext());
inputSplitCache.put(partitionOwner, partition);
}
Vertex<I, V, E, M> oldVertex =
@@ -674,7 +676,7 @@ public class BspServiceWorker<I extends
partitionOwner.getPartitionId())) {
Partition<I, V, E, M> partition =
new Partition<I, V, E, M>(getConfiguration(),
- partitionOwner.getPartitionId());
+ partitionOwner.getPartitionId(), getContext());
getPartitionStore().addPartition(partition);
}
}
@@ -1324,7 +1326,8 @@ public class BspServiceWorker<I extends
Partition<I, V, E, M> partition =
new Partition<I, V, E, M>(
getConfiguration(),
- partitionId);
+ partitionId,
+ getContext());
DataInputStream partitionsStream =
getFs().open(new Path(partitionsFile));
if (partitionsStream.skip(startPos) != startPos) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Wed Oct 3 01:37:11 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
@@ -73,15 +74,20 @@ public class DiskBackedPartitionStore<I
/** Locks for accessing and modifying partitions. */
private final ConcurrentMap<Integer, Lock> partitionLocks =
Maps.newConcurrentMap();
+ /** Context used to report progress */
+ private final Mapper<?, ?, ?, ?>.Context context;
/**
* Constructor.
*
* @param conf Configuration
+ * @param context Mapper context
*/
public DiskBackedPartitionStore(
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+ Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
+ this.context = context;
// We must be able to hold at least one partition in memory
maxInMemoryPartitions = Math.max(1,
conf.getInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY,
@@ -156,7 +162,7 @@ public class DiskBackedPartitionStore<I
private Partition<I, V, E, M> readPartition(Integer partitionId)
throws IOException {
Partition<I, V, E, M> partition = new Partition<I, V, E, M>(conf,
- partitionId);
+ partitionId, context);
File file = new File(getPartitionPath(partitionId));
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
@@ -284,7 +290,8 @@ public class DiskBackedPartitionStore<I
} else {
Lock lock = createLock(partitionId);
if (lock != null) {
- addPartitionNoLock(new Partition<I, V, E, M>(conf, partitionId));
+ addPartitionNoLock(
+ new Partition<I, V, E, M>(conf, partitionId, context));
lock.unlock();
} else {
// Another thread is already creating the partition,
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Wed Oct 3 01:37:11 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import com.google.common.collect.Maps;
@@ -52,17 +53,22 @@ public class Partition<I extends Writabl
private final int id;
/** Vertex map for this range (keyed by index) */
private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+ /** Context used to report progress */
+ private final Mapper<?, ?, ?, ?>.Context context;
/**
* Constructor.
*
* @param conf Configuration.
* @param id Partition id.
+ * @param context Mapper context
*/
public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- int id) {
+ int id,
+ Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
this.id = id;
+ this.context = context;
if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
@@ -154,7 +160,7 @@ public class Partition<I extends Writabl
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
Vertex<I, V, E, M> vertex = conf.createVertex();
- vertex.getContext().progress();
+ context.progress();
vertex.readFields(input);
if (vertexMap.put(vertex.getId(), vertex) != null) {
throw new IllegalStateException(
@@ -168,7 +174,7 @@ public class Partition<I extends Writabl
public void write(DataOutput output) throws IOException {
output.writeInt(vertexMap.size());
for (Vertex vertex : vertexMap.values()) {
- vertex.getContext().progress();
+ context.progress();
vertex.write(output);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Wed Oct 3 01:37:11 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import com.google.common.collect.Maps;
@@ -44,15 +45,20 @@ public class SimplePartitionStore<I exte
Maps.newConcurrentMap();
/** Configuration. */
private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ /** Context used to report progress */
+ private final Mapper<?, ?, ?, ?>.Context context;
/**
* Constructor.
*
* @param conf Configuration
+ * @param context Mapper context
*/
public SimplePartitionStore(
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+ Mapper<?, ?, ?, ?>.Context context) {
this.conf = conf;
+ this.context = context;
}
@Override
@@ -69,7 +75,7 @@ public class SimplePartitionStore<I exte
Partition<I, V, E, M> partition = partitions.get(partitionId);
if (partition == null) {
Partition<I, V, E, M> newPartition = new Partition<I, V, E, M>(conf,
- partitionId);
+ partitionId, context);
partition = partitions.putIfAbsent(partitionId, newPartition);
if (partition == null) {
partition = newPartition;
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed Oct 3 01:37:11 2012
@@ -19,22 +19,16 @@
package org.apache.giraph.comm;
import com.google.common.collect.Sets;
-import java.util.Iterator;
import java.util.Set;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.benchmark.EdgeListVertexPageRankBenchmark;
-import org.apache.giraph.benchmark.PageRankBenchmark;
import org.apache.giraph.comm.messages.SimpleMessageStore;
import org.apache.giraph.comm.netty.handler.RequestServerHandler;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.MockUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
@@ -80,9 +74,11 @@ public class ConnectionTest {
when(context.getConfiguration()).thenReturn(conf);
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
NettyServer server =
new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
@@ -107,9 +103,11 @@ public class ConnectionTest {
when(context.getConfiguration()).thenReturn(conf);
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
RequestServerHandler.Factory requestServerHandlerFactory =
new WorkerRequestServerHandler.Factory(serverData);
@@ -145,9 +143,11 @@ public class ConnectionTest {
when(context.getConfiguration()).thenReturn(conf);
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
NettyServer server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
server.start();
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Wed Oct 3 01:37:11 2012
@@ -27,12 +27,8 @@ import org.apache.giraph.comm.netty.hand
import org.apache.giraph.comm.requests.SendPartitionMessagesRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.MockUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
import org.junit.Test;
@@ -132,9 +128,11 @@ public class RequestFailureTest {
public void send2Requests() throws IOException {
// Start the service
serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
server.start();
@@ -169,7 +167,8 @@ public class RequestFailureTest {
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
server.start();
@@ -204,7 +203,7 @@ public class RequestFailureTest {
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf), context);
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
server.start();
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Wed Oct 3 01:37:11 2012
@@ -29,15 +29,11 @@ import org.apache.giraph.comm.requests.S
import org.apache.giraph.comm.requests.SendVertexRequest;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.giraph.utils.MockUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.junit.Before;
import org.junit.Test;
@@ -96,9 +92,11 @@ public class RequestTest {
// Start the service
serverData =
- new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (conf, SimpleMessageStore.newFactory(
- MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ conf,
+ SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+ context);
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData));
server.start();
Modified: giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java?rev=1393251&r1=1393250&r2=1393251&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java Wed Oct 3 01:37:11 2012
@@ -20,20 +20,18 @@ package org.apache.giraph.graph.partitio
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.IntIntNullIntVertex;
import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -45,6 +43,7 @@ import java.io.IOException;
*/
public class TestPartitionStores {
private ImmutableClassesGiraphConfiguration conf;
+ private Mapper<?, ?, ?, ?>.Context context;
public static class MyVertex extends IntIntNullIntVertex {
@Override
@@ -58,7 +57,7 @@ public class TestPartitionStores {
NullWritable, IntWritable>... vertices) {
Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
new Partition<IntWritable, IntWritable, NullWritable,
- IntWritable>(conf, id);
+ IntWritable>(conf, id, context);
for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
vertices) {
partition.putVertex(v);
@@ -71,13 +70,14 @@ public class TestPartitionStores {
GiraphConfiguration configuration = new GiraphConfiguration();
configuration.setVertexClass(MyVertex.class);
conf = new ImmutableClassesGiraphConfiguration(configuration);
+ context = mock(Mapper.Context.class);
}
@Test
public void testSimplePartitionStore() {
PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
- NullWritable, IntWritable>(conf);
+ NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
}
@@ -88,12 +88,12 @@ public class TestPartitionStores {
PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
partitionStore = new DiskBackedPartitionStore<IntWritable,
- IntWritable, NullWritable, IntWritable>(conf);
+ IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
conf.setInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY, 2);
partitionStore = new DiskBackedPartitionStore<IntWritable,
- IntWritable, NullWritable, IntWritable>(conf);
+ IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
}