You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/15 21:17:44 UTC
svn commit: r1409973 [1/3] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/benchmark/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph/src...
Author: aching
Date: Thu Nov 15 20:17:38 2012
New Revision: 1409973
URL: http://svn.apache.org/viewvc?rev=1409973&view=rev
Log:
GIRAPH-417: Serialize the graph/message cache into byte[] for
improving memory usage and compute speed. (aching)
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
- copied, changed from r1408926, giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
Removed:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Nov 15 20:17:38 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-417: Serialize the graph/message cache into byte[] for
+ improving memory usage and compute speed. (aching)
+
GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false
(majakabiljo)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Thu Nov 15 20:17:38 2012
@@ -28,6 +28,9 @@ import org.apache.giraph.graph.VertexOut
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+
+import org.apache.giraph.graph.partition.Partition;
+
import org.apache.hadoop.conf.Configuration;
/**
@@ -78,6 +81,9 @@ public class GiraphConfiguration extends
public static final String AGGREGATOR_WRITER_CLASS =
"giraph.aggregatorWriterClass";
+ /** Partition class - optional */
+ public static final String PARTITION_CLASS = "giraph.partitionClass";
+
/**
* Minimum number of simultaneous workers before this job can run (int)
*/
@@ -620,6 +626,15 @@ public class GiraphConfiguration extends
/** Default is not to do authenticate and authorization with Netty. */
public static final boolean DEFAULT_AUTHENTICATE = false;
+ /** Use unsafe serialization? */
+ public static final String USE_UNSAFE_SERIALIZATION =
+ "giraph.useUnsafeSerialization";
+ /**
+ * Use unsafe serialization default is true (use it if you can,
+ * its much faster)!
+ */
+ public static final boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+
/**
* Constructor that creates the configuration
*/
@@ -754,6 +769,18 @@ public class GiraphConfiguration extends
}
/**
+ * Set the partition class (optional)
+ *
+ * @param partitionClass Determines the why partitions are stored
+ */
+ public final void setPartitionClass(
+ Class<? extends Partition> partitionClass) {
+ setClass(PARTITION_CLASS,
+ partitionClass,
+ Partition.class);
+ }
+
+ /**
* Set worker configuration for determining what is required for
* a superstep.
*
@@ -987,4 +1014,13 @@ public class GiraphConfiguration extends
public long getInputSplitMaxEdges() {
return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
}
+
+ /**
+ * Set whether to use unsafe serialization
+ *
+ * @param useUnsafeSerialization If true, use unsafe serialization
+ */
+ public void useUnsafeSerialization(boolean useUnsafeSerialization) {
+ setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Thu Nov 15 20:17:38 2012
@@ -18,6 +18,7 @@
package org.apache.giraph;
+import java.util.List;
import org.apache.giraph.graph.AggregatorWriter;
import org.apache.giraph.graph.Combiner;
import org.apache.giraph.graph.DefaultMasterCompute;
@@ -34,14 +35,21 @@ import org.apache.giraph.graph.WorkerCon
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.giraph.graph.partition.HashPartitionerFactory;
import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.SimplePartition;
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-
-import java.util.List;
+import org.apache.hadoop.util.Progressable;
/**
* The classes set here are immutable, the remaining configuration is mutable.
@@ -97,6 +105,15 @@ public class ImmutableClassesGiraphConfi
/** Master compute class - cached for fast access */
private final Class<? extends MasterCompute> masterComputeClass;
+ /** Partition class - cached for fast accesss */
+ private final Class<? extends Partition<I, V, E, M>> partitionClass;
+
+ /**
+ * Use unsafe serialization? Cached for fast access to instantiate the
+ * extended data input/output classes
+ */
+ private final boolean useUnsafeSerialization;
+
/**
* Constructor. Takes the configuration and then gets the classes out of
* them for Giraph
@@ -146,6 +163,12 @@ public class ImmutableClassesGiraphConfi
DefaultWorkerContext.class, WorkerContext.class);
masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS,
DefaultMasterCompute.class, MasterCompute.class);
+
+ partitionClass = (Class<? extends Partition<I, V, E, M>>)
+ conf.getClass(PARTITION_CLASS, SimplePartition.class);
+
+ useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
+ USE_UNSAFE_SERIALIZATION_DEFAULT);
}
/**
@@ -493,4 +516,88 @@ public class ImmutableClassesGiraphConfi
}
}
}
+
+ /**
+ * Create a partition
+ *
+ * @param id Partition id
+ * @param progressable Progressable for reporting progress
+ * @return Instantiated partition
+ */
+ public Partition<I, V, E, M> createPartition(
+ int id, Progressable progressable) {
+ Partition<I, V, E, M> partition =
+ ReflectionUtils.newInstance(partitionClass, this);
+ partition.initialize(id, progressable);
+ return partition;
+ }
+
+ /**
+ * Use unsafe serialization?
+ *
+ * @return True if using unsafe serialization, false otherwise.
+ */
+ public boolean useUnsafeSerialization() {
+ return useUnsafeSerialization;
+ }
+
+ /**
+ * Create an extended data output (can be subclassed)
+ *
+ * @return ExtendedDataOutput object
+ */
+ public ExtendedDataOutput createExtendedDataOutput() {
+ if (useUnsafeSerialization) {
+ return new UnsafeByteArrayOutputStream();
+ } else {
+ return new ExtendedByteArrayDataOutput();
+ }
+ }
+
+ /**
+ * Create an extended data output (can be subclassed)
+ *
+ * @param expectedSize Expected size
+ * @return ExtendedDataOutput object
+ */
+ public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
+ if (useUnsafeSerialization) {
+ return new UnsafeByteArrayOutputStream(expectedSize);
+ } else {
+ return new ExtendedByteArrayDataOutput(expectedSize);
+ }
+ }
+
+ /**
+ * Create an extended data output (can be subclassed)
+ *
+ * @param buf Buffer to use for the output (reuse perhaps)
+ * @param pos How much of the buffer is already used
+ * @return ExtendedDataOutput object
+ */
+ public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
+ int pos) {
+ if (useUnsafeSerialization) {
+ return new UnsafeByteArrayOutputStream(buf, pos);
+ } else {
+ return new ExtendedByteArrayDataOutput(buf, pos);
+ }
+ }
+
+ /**
+ * Create an extended data input (can be subclassed)
+ *
+ * @param buf Buffer to use for the input
+ * @param off Where to start reading in the buffer
+ * @param length Maximum length of the buffer
+ * @return ExtendedDataInput object
+ */
+ public ExtendedDataInput createExtendedDataInput(
+ byte[] buf, int off, int length) {
+ if (useUnsafeSerialization) {
+ return new UnsafeByteArrayInputStream(buf, off, length);
+ } else {
+ return new ExtendedByteArrayDataInput(buf, off, length);
+ }
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Nov 15 20:17:38 2012
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.benchmark;
import org.apache.commons.cli.CommandLine;
@@ -75,7 +74,9 @@ public class PageRankBenchmark implement
options.addOption("c",
"vertexClass",
true,
- "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+ "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
+ "2 for RepresentativeVertex, " +
+ "3 for RepresentativeVertex with unsafe)");
options.addOption("N",
"name",
true,
@@ -125,9 +126,17 @@ public class PageRankBenchmark implement
(Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
job.getConfiguration().setVertexClass(
EdgeListVertexPageRankBenchmark.class);
- } else {
+ } else if (Integer.parseInt(cmd.getOptionValue('c')) == 0) {
job.getConfiguration().setVertexClass(
HashMapVertexPageRankBenchmark.class);
+ } else if (Integer.parseInt(cmd.getOptionValue('c')) == 2) {
+ job.getConfiguration().setVertexClass(
+ RepresentativeVertexPageRankBenchmark.class);
+ job.getConfiguration().useUnsafeSerialization(false);
+ } else if (Integer.parseInt(cmd.getOptionValue('c')) == 3) {
+ job.getConfiguration().setVertexClass(
+ RepresentativeVertexPageRankBenchmark.class);
+ job.getConfiguration().useUnsafeSerialization(true);
}
LOG.info("Using class " +
job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.graph.RepresentativeVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.RepresentativeVertex}
+ * implementation.
+ */
+public class RepresentativeVertexPageRankBenchmark extends
+ RepresentativeVertex<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws
+ IOException {
+ PageRankComputation.computePageRank(this, messages);
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Thu Nov 15 20:17:38 2012
@@ -18,20 +18,19 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
-
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Aggregates the messages to be send to workers so they can be sent
* in bulk. Not thread-safe.
@@ -43,7 +42,7 @@ import com.google.common.collect.Maps;
public class SendMessageCache<I extends WritableComparable,
M extends Writable> {
/** Internal cache */
- private final VertexIdMessageCollection<I, M>[] messageCache;
+ private final ByteArrayVertexIdMessageCollection<I, M>[] messageCache;
/** Number of messages in each partition */
private final int[] messageCounts;
/** List of partition ids belonging to a worker */
@@ -74,7 +73,7 @@ public class SendMessageCache<I extends
workerPartitionIds.add(partitionOwner.getPartitionId());
maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
}
- messageCache = new VertexIdMessageCollection[maxPartition + 1];
+ messageCache = new ByteArrayVertexIdMessageCollection[maxPartition + 1];
int maxWorker = 0;
for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
@@ -96,10 +95,11 @@ public class SendMessageCache<I extends
public int addMessage(WorkerInfo workerInfo,
final int partitionId, I destVertexId, M message) {
// Get the message collection
- VertexIdMessageCollection<I, M> partitionMessages =
+ ByteArrayVertexIdMessageCollection<I, M> partitionMessages =
messageCache[partitionId];
if (partitionMessages == null) {
- partitionMessages = new VertexIdMessageCollection<I, M>(conf);
+ partitionMessages = new ByteArrayVertexIdMessageCollection<I, M>();
+ partitionMessages.setConf(conf);
partitionMessages.initialize();
messageCache[partitionId] = partitionMessages;
}
@@ -115,13 +115,13 @@ public class SendMessageCache<I extends
*
* @param workerInfo the address of the worker who owns the data
* partitions that are receiving the messages
- * @return List of pairs (partitionId, VertexIdMessageCollection),
+ * @return List of pairs (partitionId, ByteArrayVertexIdMessageCollection),
* where all partition ids belong to workerInfo
*/
- public PairList<Integer, VertexIdMessageCollection<I, M>>
+ public PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
removeWorkerMessages(WorkerInfo workerInfo) {
- PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
- new PairList<Integer, VertexIdMessageCollection<I, M>>();
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> workerMessages =
+ new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
workerMessages.initialize();
for (Integer partitionId : workerPartitions.get(workerInfo)) {
if (messageCache[partitionId] != null) {
@@ -139,13 +139,15 @@ public class SendMessageCache<I extends
* @return All vertex messages for all partitions
*/
public PairList<WorkerInfo, PairList<
- Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
- PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+ Integer, ByteArrayVertexIdMessageCollection<I, M>>> removeAllMessages() {
+ PairList<WorkerInfo, PairList<Integer,
+ ByteArrayVertexIdMessageCollection<I, M>>>
allMessages = new PairList<WorkerInfo,
- PairList<Integer, VertexIdMessageCollection<I, M>>>();
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>();
allMessages.initialize();
for (WorkerInfo workerInfo : workerPartitions.keySet()) {
- PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I,
+ M>> workerMessages =
removeWorkerMessages(workerInfo);
if (!workerMessages.isEmpty()) {
allMessages.add(workerInfo, workerMessages);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java Thu Nov 15 20:17:38 2012
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.comm;
import java.util.HashMap;
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java Thu Nov 15 20:17:38 2012
@@ -94,8 +94,7 @@ public class SendPartitionCache<I extend
Partition<I, V, E, M> partition =
ownerPartitionMap.get(partitionOwner);
if (partition == null) {
- partition = new Partition<I, V, E, M>(
- configuration,
+ partition = configuration.createPartition(
partitionOwner.getPartitionId(),
context);
ownerPartitionMap.put(partitionOwner, partition);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java Thu Nov 15 20:17:38 2012
@@ -18,9 +18,9 @@
package org.apache.giraph.comm.aggregators;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataOutput;
/**
* Wrapper for output stream which keeps the place in the beginning for the
@@ -28,9 +28,7 @@ import java.io.IOException;
*/
public abstract class CountingOutputStream {
/** DataOutput to which subclasses will be writing data */
- protected DataOutputStream dataOutput;
- /** Byte output stream used by dataOutput */
- private final ExtendedByteArrayOutputStream byteOutput;
+ protected ExtendedDataOutput dataOutput;
/** Counter for objects which were written to the stream */
private int counter;
@@ -38,8 +36,7 @@ public abstract class CountingOutputStre
* Default constructor
*/
public CountingOutputStream() {
- byteOutput = new ExtendedByteArrayOutputStream();
- dataOutput = new DataOutputStream(byteOutput);
+ dataOutput = new ExtendedByteArrayDataOutput();
reset();
}
@@ -56,7 +53,7 @@ public abstract class CountingOutputStre
* @return Number of bytes
*/
protected int getSize() {
- return byteOutput.size();
+ return dataOutput.getPos();
}
/**
@@ -65,14 +62,9 @@ public abstract class CountingOutputStre
* @return Number of objects followed by the data written to the stream
*/
public byte[] flush() {
- byteOutput.writeIntOnPosition(counter, 0);
- try {
- dataOutput.flush();
- } catch (IOException e) {
- throw new IllegalStateException(
- "flush: IOException occurred while flushing", e);
- }
- byte[] ret = byteOutput.toByteArray();
+ dataOutput.writeInt(0, counter);
+ // Actual flush not required, this is a byte array
+ byte[] ret = dataOutput.toByteArray();
reset();
return ret;
}
@@ -81,39 +73,13 @@ public abstract class CountingOutputStre
* Reset the stream
*/
private void reset() {
- byteOutput.reset();
- dataOutput = new DataOutputStream(byteOutput);
+ dataOutput.reset();
// reserve the place for count to be written in the end
- for (int i = 0; i < 4; i++) {
- byteOutput.write(0);
+ try {
+ dataOutput.writeInt(0);
+ } catch (IOException e) {
+ throw new IllegalStateException("reset: Got IOException", e);
}
counter = 0;
}
-
- /**
- * Subclass of {@link ByteArrayOutputStream} which provides an option to
- * write int value over previously written data
- */
- public static class ExtendedByteArrayOutputStream extends
- ByteArrayOutputStream {
-
- /**
- * Write integer value over previously written data at certain
- * position in byte array
- *
- * @param value Value to write
- * @param position Position from which to write
- */
- public void writeIntOnPosition(int value, int position) {
- if (position + 4 > count) {
- throw new IndexOutOfBoundsException(
- "writeIntOnPosition: Tried to write int to position " + position +
- " but current length is " + count);
- }
- buf[position] = (byte) ((value >>> 24) & 0xFF);
- buf[position + 1] = (byte) ((value >>> 16) & 0xFF);
- buf[position + 2] = (byte) ((value >>> 8) & 0xFF);
- buf[position + 3] = (byte) ((value >>> 0) & 0xFF);
- }
- }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Thu Nov 15 20:17:38 2012
@@ -18,15 +18,9 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.VertexIdMessageCollection;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -36,6 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Message store which separates data by partitions,
@@ -101,14 +99,16 @@ public class DiskBackedMessageStoreByPar
}
@Override
- public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ public void addPartitionMessages(
+ ByteArrayVertexIdMessageCollection<I, M> messages,
int partitionId) throws IOException {
Map<I, Collection<M>> map = Maps.newHashMap();
- VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+ ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
+ messages.getIterator();
while (iterator.hasNext()) {
iterator.next();
- I vertexId = iterator.getCurrentFirst();
- M message = iterator.getCurrentSecond();
+ I vertexId = iterator.getCurrentVertexId();
+ M message = iterator.getCurrentMessage();
Collection<M> currentMessages = map.get(vertexId);
if (currentMessages == null) {
currentMessages = Lists.newArrayList(message);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Thu Nov 15 20:17:38 2012
@@ -18,15 +18,14 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.comm.VertexIdMessageCollection;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Message store which stores data by partition
@@ -53,7 +52,7 @@ public interface MessageStoreByPartition
* @param partitionId Id of partition
* @throws IOException
*/
- void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ void addPartitionMessages(ByteArrayVertexIdMessageCollection<I, M> messages,
int partitionId) throws IOException;
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Thu Nov 15 20:17:38 2012
@@ -20,14 +20,12 @@ package org.apache.giraph.comm.messages;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -37,6 +35,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+
/**
* Abstract class for {@link MessageStoreByPartition} which allows any kind
* of object to hold messages for one vertex.
@@ -189,16 +189,18 @@ public abstract class SimpleMessageStore
}
@Override
- public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+ public void addPartitionMessages(ByteArrayVertexIdMessageCollection<I,
+ M> messages,
int partitionId) throws IOException {
ConcurrentMap<I, T> partitionMap =
getOrCreatePartitionMap(partitionId);
- VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+ ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
+ messages.getIterator();
while (iterator.hasNext()) {
iterator.next();
- I vertexId = iterator.getCurrentFirst();
- M message = iterator.getCurrentSecond();
+ I vertexId = iterator.getCurrentVertexId();
+ M message = iterator.getCurrentMessage();
addVertexMessageToPartition(vertexId, message, partitionMap);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Thu Nov 15 20:17:38 2012
@@ -17,6 +17,10 @@
*/
package org.apache.giraph.comm.netty;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -24,7 +28,6 @@ import org.apache.giraph.comm.SendMessag
import org.apache.giraph.comm.SendMutationsCache;
import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.VertexIdMessageCollection;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -43,18 +46,13 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
import org.apache.giraph.metrics.ValueGauge;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
/**
* Aggregate requests and sends them to the thread-safe NettyClient. This
* class is not thread-safe and expected to be used and then thrown away after
@@ -150,7 +148,8 @@ public class NettyWorkerClientRequestPro
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageCount >= maxMessagesPerWorker) {
- PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+ workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -171,8 +170,7 @@ public class NettyWorkerClientRequestPro
}
WritableRequest vertexRequest =
- new SendVertexRequest<I, V, E, M>(partitionId,
- partition.getVertices());
+ new SendVertexRequest<I, V, E, M>(partition);
doRequest(workerInfo, vertexRequest);
// Messages are stored separately
@@ -325,10 +323,11 @@ public class NettyWorkerClientRequestPro
sendPartitionCache.clear();
// Execute the remaining sends messages (if any)
- PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+ PairList<WorkerInfo, PairList<Integer,
+ ByteArrayVertexIdMessageCollection<I, M>>>
remainingMessageCache = sendMessageCache.removeAllMessages();
PairList<WorkerInfo,
- PairList<Integer, VertexIdMessageCollection<I, M>>>.Iterator
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>.Iterator
iterator = remainingMessageCache.getIterator();
while (iterator.hasNext()) {
iterator.next();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java Thu Nov 15 20:17:38 2012
@@ -19,17 +19,14 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.partition.Partition;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import com.google.common.collect.Lists;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
/**
* Send a collection of vertices for a partition.
@@ -46,10 +43,8 @@ public class SendVertexRequest<I extends
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendVertexRequest.class);
- /** Partition id */
- private int partitionId;
- /** List of vertices to be stored on this partition */
- private Collection<Vertex<I, V, E, M>> vertices;
+ /** Partition */
+ private Partition<I, V, E, M> partition;
/**
* Constructor used for reflection only
@@ -59,34 +54,21 @@ public class SendVertexRequest<I extends
/**
* Constructor for sending a request.
*
- * @param partitionId Partition to send the request to
- * @param vertices Vertices to send
+ * @param partition Partition to send the request to
*/
- public SendVertexRequest(int partitionId,
- Collection<Vertex<I, V, E, M>> vertices) {
- this.partitionId = partitionId;
- this.vertices = vertices;
+ public SendVertexRequest(Partition<I, V, E, M> partition) {
+ this.partition = partition;
}
@Override
public void readFieldsRequest(DataInput input) throws IOException {
- partitionId = input.readInt();
- int verticesCount = input.readInt();
- vertices = Lists.newArrayListWithCapacity(verticesCount);
- for (int i = 0; i < verticesCount; ++i) {
- Vertex<I, V, E, M> vertex = getConf().createVertex();
- vertex.readFields(input);
- vertices.add(vertex);
- }
+ partition = getConf().createPartition(-1, null);
+ partition.readFields(input);
}
@Override
public void writeRequest(DataOutput output) throws IOException {
- output.writeInt(partitionId);
- output.writeInt(vertices.size());
- for (Vertex<I, V, E, M> vertex : vertices) {
- vertex.write(output);
- }
+ partition.write(output);
}
@Override
@@ -96,12 +78,7 @@ public class SendVertexRequest<I extends
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- if (vertices.isEmpty()) {
- LOG.warn("doRequest: Got an empty request!");
- return;
- }
- serverData.getPartitionStore().addPartitionVertices(partitionId,
- vertices);
+ serverData.getPartitionStore().addPartition(partition);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Thu Nov 15 20:17:38 2012
@@ -18,17 +18,16 @@
package org.apache.giraph.comm.requests;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.VertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
/**
* Send a collection of vertex messages for a partition.
*
@@ -49,7 +48,7 @@ public class SendWorkerMessagesRequest<I
* are owned by a single (destination) worker. These messages are all
* destined for this worker.
* */
- private PairList<Integer, VertexIdMessageCollection<I, M>>
+ private PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
partitionVertexMessages;
/**
@@ -60,10 +59,11 @@ public class SendWorkerMessagesRequest<I
/**
* Constructor used to send request.
*
- * @param partVertMsgs Map of remote partitions => VertexIdMessageCollection
+ * @param partVertMsgs Map of remote partitions =>
+ * ByteArrayVertexIdMessageCollection
*/
public SendWorkerMessagesRequest(
- PairList<Integer, VertexIdMessageCollection<I, M>> partVertMsgs) {
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> partVertMsgs) {
super();
this.partitionVertexMessages = partVertMsgs;
}
@@ -72,12 +72,13 @@ public class SendWorkerMessagesRequest<I
public void readFieldsRequest(DataInput input) throws IOException {
int numPartitions = input.readInt();
partitionVertexMessages =
- new PairList<Integer, VertexIdMessageCollection<I, M>>();
+ new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
partitionVertexMessages.initialize(numPartitions);
while (numPartitions-- > 0) {
final int partitionId = input.readInt();
- VertexIdMessageCollection<I, M> vertexIdMessages =
- new VertexIdMessageCollection<I, M>(getConf());
+ ByteArrayVertexIdMessageCollection<I, M> vertexIdMessages =
+ new ByteArrayVertexIdMessageCollection<I, M>();
+ vertexIdMessages.setConf(getConf());
vertexIdMessages.readFields(input);
partitionVertexMessages.add(partitionId, vertexIdMessages);
}
@@ -86,8 +87,8 @@ public class SendWorkerMessagesRequest<I
@Override
public void writeRequest(DataOutput output) throws IOException {
output.writeInt(partitionVertexMessages.getSize());
- PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
- partitionVertexMessages.getIterator();
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+ iterator = partitionVertexMessages.getIterator();
while (iterator.hasNext()) {
iterator.next();
output.writeInt(iterator.getCurrentFirst());
@@ -102,8 +103,8 @@ public class SendWorkerMessagesRequest<I
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
- partitionVertexMessages.getIterator();
+ PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+ iterator = partitionVertexMessages.getIterator();
while (iterator.hasNext()) {
iterator.next();
try {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Nov 15 20:17:38 2012
@@ -525,7 +525,7 @@ else[HADOOP_NON_SECURE]*/
!getPartitionStore().hasPartition(
partitionOwner.getPartitionId())) {
Partition<I, V, E, M> partition =
- new Partition<I, V, E, M>(getConfiguration(),
+ getConfiguration().createPartition(
partitionOwner.getPartitionId(), getContext());
getPartitionStore().addPartition(partition);
}
@@ -546,7 +546,7 @@ else[HADOOP_NON_SECURE]*/
getPartitionStore().getPartitions()) {
PartitionStats partitionStats =
new PartitionStats(partition.getId(),
- partition.getVertices().size(),
+ partition.getVertexCount(),
0,
partition.getEdgeCount(),
0);
@@ -847,7 +847,7 @@ else[HADOOP_NON_SECURE]*/
vertexWriter.initialize(getContext());
for (Partition<I, V, E, M> partition :
getPartitionStore().getPartitions()) {
- for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+ for (Vertex<I, V, E, M> vertex : partition) {
getContext().progress();
vertexWriter.writeVertex(vertex);
}
@@ -1057,10 +1057,7 @@ else[HADOOP_NON_SECURE]*/
}
metadataStream.close();
Partition<I, V, E, M> partition =
- new Partition<I, V, E, M>(
- getConfiguration(),
- partitionId,
- getContext());
+ getConfiguration().createPartition(partitionId, getContext());
DataInputStream partitionsStream =
getFs().open(new Path(partitionsFile));
if (partitionsStream.skip(startPos) != startPos) {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Thu Nov 15 20:17:38 2012
@@ -187,7 +187,7 @@ public class ComputeCallable<I extends W
new PartitionStats(partition.getId(), 0, 0, 0, 0);
// Make sure this is thread-safe across runs
synchronized (partition) {
- for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+ for (Vertex<I, V, E, M> vertex : partition) {
// Make sure every vertex has this thread's
// graphState before computing
vertex.setGraphState(graphState);
@@ -205,6 +205,8 @@ public class ComputeCallable<I extends W
} finally {
computeOneTimerContext.stop();
}
+ // Need to save the vertex changes (possibly)
+ partition.saveVertex(vertex);
}
if (vertex.isHalted()) {
partitionStats.incrFinishedVertexCount();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Nov 15 20:17:38 2012
@@ -34,7 +34,10 @@ import java.util.Map;
/**
* User applications can subclass {@link EdgeListVertex}, which stores
* the outbound edges in an ArrayList (less memory as the cost of expensive
- * random-access lookup). Good for static graphs.
+ * random-access lookup). Good for static graphs. Not nearly as memory
+ * efficient as using RepresentativeVertex + ByteArrayPartition
+ * (probably about 10x more), but not bad when keeping vertices as objects in
+ * memory (SimplePartition).
*
* @param <I> Vertex index value
* @param <V> Vertex value
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Nov 15 20:17:38 2012
@@ -133,6 +133,7 @@ public abstract class LongDoubleFloatDou
long id = in.readLong();
double value = in.readDouble();
initialize(new LongWritable(id), new DoubleWritable(value));
+ edgeMap.clear();
long edgeMapSize = in.readLong();
for (long i = 0; i < edgeMapSize; ++i) {
long targetVertexId = in.readLong();
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * This vertex should only be used in conjunction with ByteArrayPartition since
+ * it has special code to deserialize by reusing objects and not instantiating
+ * new ones. If used without ByteArrayPartition, it will cause a lot of
+ * wasted memory.
+ *
+ * Also, this vertex is optimized for space and not efficient for either adding
+ * or random access of edges.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class RepresentativeVertex<
+ I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RepresentativeVertex.class);
+ /** Representative edge */
+ private final Edge<I, E> representativeEdge = new Edge<I, E>();
+ /** Serialized edges */
+ private byte[] serializedEdges;
+ /** Byte used in serializedEdges */
+ private int serializedEdgesBytesUsed;
+ /** Number of edges */
+ private int edgeCount;
+
+ @Override
+ public final void initialize(I id, V value, Map<I, E> edges) {
+ // Make sure the initial values exist
+ representativeEdge.setTargetVertexId(getConf().createVertexId());
+ representativeEdge.setValue(getConf().createEdgeValue());
+ super.initialize(id, value, edges);
+ }
+
+ @Override
+ public final void initialize(I id, V value) {
+ // Make sure the initial values exist
+ representativeEdge.setTargetVertexId(getConf().createVertexId());
+ representativeEdge.setValue(getConf().createEdgeValue());
+ super.initialize(id, value);
+ }
+
+ /**
+ * Iterator that uses the representative edge (only one iterator allowed
+ * at a time)
+ */
+ private final class RepresentativeEdgeIterator implements
+ Iterator<Edge<I, E>> {
+ /** Input for processing the bytes */
+ private final ExtendedDataInput extendedDataInput;
+
+ /** Constructor. */
+ RepresentativeEdgeIterator() {
+ extendedDataInput = getConf().createExtendedDataInput(
+ serializedEdges, 0, serializedEdgesBytesUsed);
+ }
+ @Override
+ public boolean hasNext() {
+ return serializedEdges != null && extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public Edge<I, E> next() {
+ try {
+ representativeEdge.getTargetVertexId().readFields(extendedDataInput);
+ representativeEdge.getValue().readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: Failed on pos " +
+ extendedDataInput.getPos() + " edge " + representativeEdge);
+ }
+ return representativeEdge;
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalAccessError("remove: Not supported");
+ }
+ }
+
+ @Override
+ public final Iterator<Edge<I, E>> iterator() {
+ return new RepresentativeEdgeIterator();
+ }
+
+ @Override
+ public final void setEdges(Map<I, E> edges) {
+ ExtendedDataOutput extendedOutputStream =
+ getConf().createExtendedDataOutput();
+ if (edges != null) {
+ for (Map.Entry<I, E> edge : edges.entrySet()) {
+ try {
+ edge.getKey().write(extendedOutputStream);
+ edge.getValue().write(extendedOutputStream);
+ } catch (IOException e) {
+ throw new IllegalStateException("setEdges: Failed to serialize " +
+ edge);
+ }
+ ++edgeCount;
+ }
+ }
+ serializedEdges = extendedOutputStream.getByteArray();
+ serializedEdgesBytesUsed = extendedOutputStream.getPos();
+ }
+
+ @Override
+ public final Iterable<Edge<I, E>> getEdges() {
+ return this;
+ }
+
+ @Override
+ public final boolean addEdge(I targetVertexId, E value) {
+ // Note that this is very expensive (deserializes all edges
+ // in an addEdge() request).
+ // Hopefully the user set all the edges in setEdges().
+ for (Edge<I, E> edge : getEdges()) {
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ LOG.warn("addEdge: Vertex=" + getId() +
+ ": already added an edge value for target vertex id " +
+ targetVertexId);
+ return false;
+ }
+ }
+ ExtendedDataOutput extendedDataOutput =
+ getConf().createExtendedDataOutput(
+ serializedEdges, serializedEdgesBytesUsed);
+ try {
+ targetVertexId.write(extendedDataOutput);
+ value.write(extendedDataOutput);
+ } catch (IOException e) {
+ throw new IllegalStateException("addEdge: Failed to write to the " +
+ "new byte array");
+ }
+ serializedEdges = extendedDataOutput.getByteArray();
+ serializedEdgesBytesUsed = extendedDataOutput.getPos();
+ ++edgeCount;
+ return true;
+ }
+
+ @Override
+ public final int getNumEdges() {
+ return edgeCount;
+ }
+
+ @Override
+ public final E removeEdge(I targetVertexId) {
+ // Note that this is very expensive (deserializes all edges
+ // in an removedge() request).
+ // Hopefully the user set all the edges correctly in setEdges().
+ RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
+ int foundStartOffset = 0;
+ while (iterator.hasNext()) {
+ Edge<I, E> edge = iterator.next();
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
+ serializedEdges, foundStartOffset,
+ serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
+ serializedEdgesBytesUsed -=
+ iterator.extendedDataInput.getPos() - foundStartOffset;
+ --edgeCount;
+ return edge.getValue();
+ }
+ foundStartOffset = iterator.extendedDataInput.getPos();
+ }
+
+ return null;
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ // Ensure these objects are present
+ if (representativeEdge.getTargetVertexId() == null) {
+ representativeEdge.setTargetVertexId(getConf().createVertexId());
+ }
+
+ if (representativeEdge.getValue() == null) {
+ representativeEdge.setValue(getConf().createEdgeValue());
+ }
+
+ I vertexId = getId();
+ if (vertexId == null) {
+ vertexId = getConf().createVertexId();
+ }
+ vertexId.readFields(in);
+
+ V vertexValue = getValue();
+ if (vertexValue == null) {
+ vertexValue = getConf().createVertexValue();
+ }
+ vertexValue.readFields(in);
+
+ initialize(vertexId, vertexValue);
+
+ serializedEdgesBytesUsed = in.readInt();
+ // Only create a new buffer if the old one isn't big enough
+ if (serializedEdges == null ||
+ serializedEdgesBytesUsed > serializedEdges.length) {
+ serializedEdges = new byte[serializedEdgesBytesUsed];
+ }
+ in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+ edgeCount = in.readInt();
+
+ boolean halt = in.readBoolean();
+ if (halt) {
+ voteToHalt();
+ } else {
+ wakeUp();
+ }
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ getId().write(out);
+ getValue().write(out);
+
+ out.writeInt(serializedEdgesBytesUsed);
+ out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+ out.writeInt(edgeCount);
+
+ out.writeBoolean(isHalted());
+ }
+}
+
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import com.google.common.collect.MapMaker;
+
+import com.google.common.primitives.Ints;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array based partition. Should reduce the amount of memory used since
+ * the entire graph is compressed into byte arrays. Must guarantee, however,
+ * that only one thread at a time will call getVertex since it is a singleton.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ByteArrayPartition<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Partition<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
+ /** Configuration from the worker */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ /** Partition id */
+ private int id;
+ /**
+ * Vertex map for this range (keyed by index). Note that the byte[] is a
+ * serialized vertex with the first four bytes as the length of the vertex
+ * to read.
+ */
+ private ConcurrentMap<I, byte[]> vertexMap;
+ /** Context used to report progress */
+ private Progressable progressable;
+ /** Representative vertex */
+ private Vertex<I, V, E, M> representativeVertex;
+ /** Use unsafe serialization */
+ private boolean useUnsafeSerialization;
+
+ /**
+ * Constructor for reflection.
+ */
+ public ByteArrayPartition() { }
+
+ @Override
+ public void initialize(int partitionId, Progressable progressable) {
+ setId(partitionId);
+ setProgressable(progressable);
+ vertexMap = new MapMaker().concurrencyLevel(
+ conf.getNettyServerExecutionConcurrency()).makeMap();
+ representativeVertex = conf.createVertex();
+ useUnsafeSerialization = conf.useUnsafeSerialization();
+ }
+
+ @Override
+ public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+ byte[] vertexData = vertexMap.get(vertexIndex);
+ if (vertexData == null) {
+ return null;
+ }
+ WritableUtils.readFieldsFromByteArrayWithSize(
+ vertexData, representativeVertex, useUnsafeSerialization);
+ return representativeVertex;
+ }
+
+ @Override
+ public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+ byte[] vertexData =
+ WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+ byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
+ if (oldVertexBytes == null) {
+ return null;
+ } else {
+ WritableUtils.readFieldsFromByteArrayWithSize(
+ oldVertexBytes, representativeVertex, useUnsafeSerialization);
+ return representativeVertex;
+ }
+ }
+
+ @Override
+ public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+ byte[] vertexBytes = vertexMap.remove(vertexIndex);
+ if (vertexBytes == null) {
+ return null;
+ }
+ WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+ representativeVertex, useUnsafeSerialization);
+ return representativeVertex;
+ }
+
+ @Override
+ public void addPartition(Partition<I, V, E, M> partition) {
+ // Only work with other ByteArrayPartition instances
+ if (!(partition instanceof ByteArrayPartition)) {
+ throw new IllegalStateException("addPartition: Cannot add partition " +
+ "of type " + partition.getClass());
+ }
+
+ ByteArrayPartition<I, V, E, M> byteArrayPartition =
+ (ByteArrayPartition<I, V, E, M>) partition;
+ for (Map.Entry<I, byte[]> entry :
+ byteArrayPartition.vertexMap.entrySet()) {
+ vertexMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public long getVertexCount() {
+ return vertexMap.size();
+ }
+
+ @Override
+ public long getEdgeCount() {
+ long edges = 0;
+ for (byte[] vertexBytes : vertexMap.values()) {
+ WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+ representativeVertex, useUnsafeSerialization);
+ edges += representativeVertex.getNumEdges();
+ }
+ return edges;
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public void setProgressable(Progressable progressable) {
+ this.progressable = progressable;
+ }
+
+ @Override
+ public void saveVertex(Vertex<I, V, E, M> vertex) {
+ // Reuse the old buffer whenever possible
+ byte[] oldVertexData = vertexMap.get(vertex.getId());
+ if (oldVertexData != null) {
+ vertexMap.put(vertex.getId(),
+ WritableUtils.writeToByteArrayWithSize(
+ vertex, oldVertexData, useUnsafeSerialization));
+ } else {
+ vertexMap.put(vertex.getId(),
+ WritableUtils.writeToByteArrayWithSize(
+ vertex, useUnsafeSerialization));
+ }
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(id);
+ output.writeInt(vertexMap.size());
+ for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
+ if (progressable != null) {
+ progressable.progress();
+ }
+ entry.getKey().write(output);
+ // Note here that we are writing the size of the vertex data first
+ // as it is encoded in the first four bytes of the byte[]
+ int vertexDataSize;
+ if (useUnsafeSerialization) {
+ vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
+ 0);
+ } else {
+ vertexDataSize = Ints.fromByteArray(entry.getValue());
+ }
+
+ output.writeInt(vertexDataSize);
+ output.write(entry.getValue(), 0, vertexDataSize);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readInt();
+ int size = input.readInt();
+ vertexMap = new MapMaker().concurrencyLevel(
+ conf.getNettyServerExecutionConcurrency()).initialCapacity(
+ size).makeMap();
+ representativeVertex = conf.createVertex();
+ useUnsafeSerialization = conf.useUnsafeSerialization();
+ for (int i = 0; i < size; ++i) {
+ if (progressable != null) {
+ progressable.progress();
+ }
+ I vertexId = conf.createVertexId();
+ vertexId.readFields(input);
+ int vertexDataSize = input.readInt();
+ byte[] vertexData = new byte[vertexDataSize];
+ input.readFully(vertexData);
+ if (vertexMap.put(vertexId, vertexData) != null) {
+ throw new IllegalStateException("readFields: Already saw vertex " +
+ vertexId);
+ }
+ }
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
+
+ @Override
+ public Iterator<Vertex<I, V, E, M>> iterator() {
+ return new RepresentativeVertexIterator();
+ }
+
+ /**
+ * Iterator that deserializes a vertex from a byte array on the fly, using
+ * the same representative vertex object.
+ */
+ private class RepresentativeVertexIterator implements
+ Iterator<Vertex<I, V, E, M>> {
+ /** Iterator to the vertex values */
+ private Iterator<byte[]> vertexDataIterator =
+ vertexMap.values().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return vertexDataIterator.hasNext();
+ }
+
+ @Override
+ public Vertex<I, V, E, M> next() {
+ WritableUtils.readFieldsFromByteArrayWithSize(
+ vertexDataIterator.next(), representativeVertex,
+ useUnsafeSerialization);
+ return representativeVertex;
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalAccessError("remove: This method is not supported.");
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Thu Nov 15 20:17:38 2012
@@ -37,7 +37,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@@ -146,7 +145,7 @@ public class DiskBackedPartitionStore<I
file.createNewFile();
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
- for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+ for (Vertex<I, V, E, M> vertex : partition) {
vertex.write(outputStream);
}
outputStream.close();
@@ -161,8 +160,8 @@ public class DiskBackedPartitionStore<I
*/
private Partition<I, V, E, M> readPartition(Integer partitionId)
throws IOException {
- Partition<I, V, E, M> partition = new Partition<I, V, E, M>(conf,
- partitionId, context);
+ Partition<I, V, E, M> partition =
+ conf.createPartition(partitionId, context);
File file = new File(getPartitionPath(partitionId));
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
@@ -178,19 +177,17 @@ public class DiskBackedPartitionStore<I
}
/**
- * Append some vertices to an out-of-core partition.
+ * Append some vertices of another partition to an out-of-core partition.
*
- * @param partitionId Id of the destination partition
- * @param vertices Vertices to be added
+ * @param partition Partition to add
* @throws IOException
*/
- private void appendVertices(Integer partitionId,
- Collection<Vertex<I, V, E, M>> vertices)
+ private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
throws IOException {
- File file = new File(getPartitionPath(partitionId));
+ File file = new File(getPartitionPath(partition.getId()));
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file, true)));
- for (Vertex<I, V, E, M> vertex : vertices) {
+ for (Vertex<I, V, E, M> vertex : partition) {
vertex.write(outputStream);
}
outputStream.close();
@@ -208,12 +205,12 @@ public class DiskBackedPartitionStore<I
}
if (LOG.isInfoEnabled()) {
LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
- " out of core");
+ " out of core with size " + loadedPartition.getVertexCount());
}
try {
writePartition(loadedPartition);
onDiskPartitions.put(loadedPartition.getId(),
- loadedPartition.getVertices().size());
+ (int) loadedPartition.getVertexCount());
loadedPartition = null;
} catch (IOException e) {
throw new IllegalStateException("loadPartition: failed writing " +
@@ -247,7 +244,8 @@ public class DiskBackedPartitionStore<I
}
try {
writePartition(partition);
- onDiskPartitions.put(partition.getId(), partition.getVertices().size());
+ onDiskPartitions.put(partition.getId(),
+ (int) partition.getVertexCount());
} catch (IOException e) {
throw new IllegalStateException("addPartition: failed writing " +
"partition " + partition.getId() + "to disk");
@@ -256,51 +254,42 @@ public class DiskBackedPartitionStore<I
@Override
public void addPartition(Partition<I, V, E, M> partition) {
- Lock lock = createLock(partition.getId());
- if (lock == null) {
- throw new IllegalStateException("addPartition: partition " +
- partition.getId() + " already exists");
- }
- addPartitionNoLock(partition);
- lock.unlock();
- }
-
- @Override
- public void addPartitionVertices(Integer partitionId,
- Collection<Vertex<I, V, E, M>> vertices) {
- if (inMemoryPartitions.containsKey(partitionId)) {
- Partition<I, V, E, M> partition = inMemoryPartitions.get(partitionId);
- partition.putVertices(vertices);
- } else if (onDiskPartitions.containsKey(partitionId)) {
- Lock lock = getLock(partitionId);
+ if (inMemoryPartitions.containsKey(partition.getId())) {
+ Partition<I, V, E, M> existingPartition =
+ inMemoryPartitions.get(partition.getId());
+ existingPartition.addPartition(partition);
+ } else if (onDiskPartitions.containsKey(partition.getId())) {
+ Lock lock = getLock(partition.getId());
lock.lock();
- if (loadedPartition != null && loadedPartition.getId() == partitionId) {
- loadedPartition.putVertices(vertices);
+ if (loadedPartition != null && loadedPartition.getId() ==
+ partition.getId()) {
+ loadedPartition.addPartition(partition);
} else {
try {
- appendVertices(partitionId, vertices);
- onDiskPartitions.put(partitionId,
- onDiskPartitions.get(partitionId) + vertices.size());
+ appendPartitionOutOfCore(partition);
+ onDiskPartitions.put(partition.getId(),
+ onDiskPartitions.get(partition.getId()) +
+ (int) partition.getVertexCount());
} catch (IOException e) {
- throw new IllegalStateException("addPartitionVertices: failed " +
- "writing vertices to partition " + partitionId + " on disk", e);
+ throw new IllegalStateException("addPartition: failed " +
+ "writing vertices to partition " + partition.getId() + " on disk",
+ e);
}
}
lock.unlock();
} else {
- Lock lock = createLock(partitionId);
+ Lock lock = createLock(partition.getId());
if (lock != null) {
- addPartitionNoLock(
- new Partition<I, V, E, M>(conf, partitionId, context));
+ addPartitionNoLock(partition);
lock.unlock();
} else {
// Another thread is already creating the partition,
// so we make sure it's done before repeating the call.
- lock = getLock(partitionId);
+ lock = getLock(partition.getId());
lock.lock();
lock.unlock();
+ addPartition(partition);
}
- addPartitionVertices(partitionId, vertices);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java Thu Nov 15 20:17:38 2012
@@ -18,21 +18,12 @@
package org.apache.giraph.graph.partition;
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.util.Progressable;
/**
* A generic container that stores vertices. Vertex ids will map to exactly
@@ -44,38 +35,17 @@ import java.util.concurrent.ConcurrentSk
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class Partition<I extends WritableComparable,
+public interface Partition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- /** Configuration from the worker */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Partition id */
- private final int id;
- /** Vertex map for this range (keyed by index) */
- private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
- /** Context used to report progress */
- private final Mapper<?, ?, ?, ?>.Context context;
-
- /**
- * Constructor.
- *
- * @param conf Configuration.
- * @param id Partition id.
- * @param context Mapper context
- */
- public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- int id,
- Mapper<?, ?, ?, ?>.Context context) {
- this.conf = conf;
- this.id = id;
- this.context = context;
- if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
- GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
- vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
- } else {
- vertexMap = Maps.newConcurrentMap();
- }
- }
+ extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
+ Iterable<Vertex<I, V, E, M>> {
+ /**
+ * Initialize the partition. Guaranteed to be called before used.
+ *
+ * @param partitionId Partition id
+ * @param progressable Progressable to call progress
+ */
+ void initialize(int partitionId, Progressable progressable);
/**
* Get the vertex for this vertex index.
@@ -83,9 +53,7 @@ public class Partition<I extends Writabl
* @param vertexIndex Vertex index to search for
* @return Vertex if it exists, null otherwise
*/
- public Vertex<I, V, E, M> getVertex(I vertexIndex) {
- return vertexMap.get(vertexIndex);
- }
+ Vertex<I, V, E, M> getVertex(I vertexIndex);
/**
* Put a vertex into the Partition
@@ -93,9 +61,7 @@ public class Partition<I extends Writabl
* @param vertex Vertex to put in the Partition
* @return old vertex value (i.e. null if none existed prior)
*/
- public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
- return vertexMap.put(vertex.getId(), vertex);
- }
+ Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
/**
* Remove a vertex from the Partition
@@ -103,79 +69,54 @@ public class Partition<I extends Writabl
* @param vertexIndex Vertex index to remove
* @return The removed vertex.
*/
- public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
- return vertexMap.remove(vertexIndex);
- }
+ Vertex<I, V, E, M> removeVertex(I vertexIndex);
/**
- * Get a collection of the vertices.
+ * Add a partition's vertices
*
- * @return Collection of the vertices
+ * @param partition Partition to add
*/
- public Collection<Vertex<I, V, E , M>> getVertices() {
- return vertexMap.values();
- }
+ void addPartition(Partition<I, V, E, M> partition);
/**
- * Put several vertices in the partition.
+ * Get the number of vertices in this partition
*
- * @param vertices Vertices to add
+ * @return Number of vertices
*/
- public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
- for (Vertex<I, V, E , M> vertex : vertices) {
- vertexMap.put(vertex.getId(), vertex);
- }
- }
+ long getVertexCount();
/**
- * Get the number of edges in this partition. Computed on the fly.
+ * Get the number of edges in this partition.
*
* @return Number of edges.
*/
- public long getEdgeCount() {
- long edges = 0;
- for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
- edges += vertex.getNumEdges();
- }
- return edges;
- }
+ long getEdgeCount();
/**
* Get the partition id.
*
* @return Id of this partition.
*/
- public int getId() {
- return id;
- }
-
- @Override
- public String toString() {
- return "(id=" + getId() + ",V=" + vertexMap.size() +
- ",E=" + getEdgeCount() + ")";
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int vertices = input.readInt();
- for (int i = 0; i < vertices; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- context.progress();
- vertex.readFields(input);
- if (vertexMap.put(vertex.getId(), vertex) != null) {
- throw new IllegalStateException(
- "readFields: " + this +
- " already has same id " + vertex);
- }
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(vertexMap.size());
- for (Vertex vertex : vertexMap.values()) {
- context.progress();
- vertex.write(output);
- }
- }
+ int getId();
+
+ /**
+ * Set the partition id.
+ *
+ * @param id Id of this partition
+ */
+ void setId(int id);
+
+ /**
+ * Set the context.
+ *
+ * @param progressable Progressable
+ */
+ void setProgressable(Progressable progressable);
+
+ /**
+ * Save potentially modified vertex back to the partition.
+ *
+ * @param vertex Vertex to save
+ */
+ void saveVertex(Vertex<I, V, E, M> vertex);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Thu Nov 15 20:17:38 2012
@@ -18,14 +18,10 @@
package org.apache.giraph.graph.partition;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
-
-import java.util.Collection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Structure that stores partitions for a worker.
@@ -39,22 +35,14 @@ public abstract class PartitionStore<I e
V extends Writable, E extends Writable, M extends Writable> {
/**
- * Add a new partition to the store.
+ * Add a new partition to the store or just the vertices from the partition
+ * to the old partition.
*
- * @param partition Partition
+ * @param partition Partition to add
*/
public abstract void addPartition(Partition<I, V, E, M> partition);
/**
- * Add some vertices to a (possibly existing) partition.
- *
- * @param partitionId Id of the destination partition
- * @param vertices Vertices
- */
- public abstract void addPartitionVertices(
- Integer partitionId, Collection<Vertex<I, V, E, M>> vertices);
-
- /**
* Get a partition.
*
* @param partitionId Partition id