You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/15 21:17:44 UTC
svn commit: r1409973 [2/3] - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/benchmark/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph/src...
Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java (from r1408926, giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java&r1=1408926&r2=1409973&rev=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java Thu Nov 15 20:17:38 2012
@@ -18,25 +18,23 @@
package org.apache.giraph.graph.partition;
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
import com.google.common.collect.Maps;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
/**
- * A generic container that stores vertices. Vertex ids will map to exactly
- * one partition.
+ * A simple map-based container that stores vertices. Vertex ids will map to
+ * exactly one partition.
*
* @param <I> Vertex id
* @param <V> Vertex data
@@ -44,31 +42,27 @@ import java.util.concurrent.ConcurrentSk
* @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class Partition<I extends WritableComparable,
+public class SimplePartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
+ implements Partition<I, V, E, M> {
/** Configuration from the worker */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Partition id */
- private final int id;
+ private int id;
/** Vertex map for this range (keyed by index) */
- private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+ private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
/** Context used to report progress */
- private final Mapper<?, ?, ?, ?>.Context context;
+ private Progressable progressable;
/**
- * Constructor.
- *
- * @param conf Configuration.
- * @param id Partition id.
- * @param context Mapper context
+ * Constructor for reflection.
*/
- public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- int id,
- Mapper<?, ?, ?, ?>.Context context) {
- this.conf = conf;
- this.id = id;
- this.context = context;
+ public SimplePartition() { }
+
+ @Override
+ public void initialize(int partitionId, Progressable progressable) {
+ setId(partitionId);
+ setProgressable(progressable);
if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
@@ -77,61 +71,34 @@ public class Partition<I extends Writabl
}
}
- /**
- * Get the vertex for this vertex index.
- *
- * @param vertexIndex Vertex index to search for
- * @return Vertex if it exists, null otherwise
- */
+ @Override
public Vertex<I, V, E, M> getVertex(I vertexIndex) {
return vertexMap.get(vertexIndex);
}
- /**
- * Put a vertex into the Partition
- *
- * @param vertex Vertex to put in the Partition
- * @return old vertex value (i.e. null if none existed prior)
- */
+ @Override
public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
return vertexMap.put(vertex.getId(), vertex);
}
- /**
- * Remove a vertex from the Partition
- *
- * @param vertexIndex Vertex index to remove
- * @return The removed vertex.
- */
+ @Override
public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
return vertexMap.remove(vertexIndex);
}
- /**
- * Get a collection of the vertices.
- *
- * @return Collection of the vertices
- */
- public Collection<Vertex<I, V, E , M>> getVertices() {
- return vertexMap.values();
- }
-
- /**
- * Put several vertices in the partition.
- *
- * @param vertices Vertices to add
- */
- public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
- for (Vertex<I, V, E , M> vertex : vertices) {
+ @Override
+ public void addPartition(Partition<I, V, E, M> partition) {
+ for (Vertex<I, V, E , M> vertex : partition) {
vertexMap.put(vertex.getId(), vertex);
}
}
- /**
- * Get the number of edges in this partition. Computed on the fly.
- *
- * @return Number of edges.
- */
+ @Override
+ public long getVertexCount() {
+ return vertexMap.size();
+ }
+
+ @Override
public long getEdgeCount() {
long edges = 0;
for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
@@ -140,27 +107,46 @@ public class Partition<I extends Writabl
return edges;
}
- /**
- * Get the partition id.
- *
- * @return Id of this partition.
- */
+ @Override
public int getId() {
return id;
}
@Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public void setProgressable(Progressable progressable) {
+ this.progressable = progressable;
+ }
+
+ @Override
+ public void saveVertex(Vertex<I, V, E, M> vertex) {
+ // No-op, vertices are stored as Java objects in this partition
+ }
+
+ @Override
public String toString() {
- return "(id=" + getId() + ",V=" + vertexMap.size() +
- ",E=" + getEdgeCount() + ")";
+ return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
}
@Override
public void readFields(DataInput input) throws IOException {
+ if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+ GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+ } else {
+ vertexMap = Maps.newConcurrentMap();
+ }
+ id = input.readInt();
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
Vertex<I, V, E, M> vertex = conf.createVertex();
- context.progress();
+ if (progressable != null) {
+ progressable.progress();
+ }
vertex.readFields(input);
if (vertexMap.put(vertex.getId(), vertex) != null) {
throw new IllegalStateException(
@@ -172,10 +158,29 @@ public class Partition<I extends Writabl
@Override
public void write(DataOutput output) throws IOException {
+ output.writeInt(id);
output.writeInt(vertexMap.size());
for (Vertex vertex : vertexMap.values()) {
- context.progress();
+ if (progressable != null) {
+ progressable.progress();
+ }
vertex.write(output);
}
}
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
+
+ @Override
+ public Iterator<Vertex<I, V, E, M>> iterator() {
+ return vertexMap.values().iterator();
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Thu Nov 15 20:17:38 2012
@@ -19,14 +19,12 @@
package org.apache.giraph.graph.partition;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import com.google.common.collect.Maps;
-import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
/**
@@ -63,25 +61,14 @@ public class SimplePartitionStore<I exte
@Override
public void addPartition(Partition<I, V, E, M> partition) {
- if (partitions.putIfAbsent(partition.getId(), partition) != null) {
- throw new IllegalStateException("addPartition: partition " +
- partition.getId() + " already exists");
- }
- }
-
- @Override
- public void addPartitionVertices(Integer partitionId,
- Collection<Vertex<I, V, E, M>> vertices) {
- Partition<I, V, E, M> partition = partitions.get(partitionId);
- if (partition == null) {
- Partition<I, V, E, M> newPartition = new Partition<I, V, E, M>(conf,
- partitionId, context);
- partition = partitions.putIfAbsent(partitionId, newPartition);
- if (partition == null) {
- partition = newPartition;
+ Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
+ if (oldPartition == null) {
+ oldPartition = partitions.putIfAbsent(partition.getId(), partition);
+ if (oldPartition == null) {
+ return;
}
}
- partition.putVertices(vertices);
+ oldPartition.addPartition(partition);
}
@Override
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Vertex id message collection that stores everything in a single byte array
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayVertexIdMessageCollection<I extends WritableComparable,
+ M extends Writable> implements Writable,
+ ImmutableClassesGiraphConfigurable {
+ /** Extended data output */
+ private ExtendedDataOutput extendedDataOutput;
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration;
+
+ /**
+ * Constructor for reflection
+ */
+ public ByteArrayVertexIdMessageCollection() { }
+
+ /**
+ * Initialize the inner state. Must be called before {@code add()} is
+ * called.
+ */
+ public void initialize() {
+ extendedDataOutput = configuration.createExtendedDataOutput();
+ }
+
+ /**
+ * Initialize the inner state, with a known size. Must be called before
+ * {@code add()} is called.
+ *
+ * @param expectedSize Number of bytes to be expected
+ */
+ public void initialize(int expectedSize) {
+ extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
+ }
+
+ /**
+ * Add a vertex id and message pair to the collection.
+ *
+ * @param vertexId Vertex id
+ * @param message Message
+ */
+ public void add(I vertexId, M message) {
+ try {
+ vertexId.write(extendedDataOutput);
+ message.write(extendedDataOutput);
+ } catch (IOException e) {
+ throw new IllegalStateException("add: IOException", e);
+ }
+ }
+
+ /**
+ * Get the number of bytes used
+ *
+ * @return Number of bytes used
+ */
+ public int getSize() {
+ return extendedDataOutput.getPos();
+ }
+
+ /**
+ * Check if the list is empty.
+ *
+ * @return True iff there are no pairs in the list
+ */
+ public boolean isEmpty() {
+ return extendedDataOutput.getPos() == 0;
+ }
+
+ /**
+ * Get iterator through elements of this object.
+ *
+ * @return {@link Iterator} iterator
+ */
+ public Iterator getIterator() {
+ return new Iterator();
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt(extendedDataOutput.getPos());
+ dataOutput.write(extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos());
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ int size = dataInput.readInt();
+ byte[] buf = new byte[size];
+ dataInput.readFully(buf);
+ extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+ }
+
+ /**
+ * Special iterator class which we'll use to iterate through elements of
+ * {@link PairList}, without having to create new object as wrapper for
+ * each pair.
+ *
+ * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+ * only here next() doesn't return the next object, it just moves along in
+ * the collection. Values related to current pair can be retrieved by calling
+ * getCurrentFirst() and getCurrentSecond() methods.
+ *
+ * Not thread-safe.
+ */
+ public class Iterator {
+ /** Reader of the serialized messages */
+ private ExtendedDataInput extendedDataInput =
+ configuration.createExtendedDataInput(
+ extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ /** Current vertex id */
+ private I vertexId;
+ /** Current message */
+ private M message;
+
+ /**
+ * Returns true if the iteration has more elements.
+ *
+ * @return True if the iteration has more elements.
+ */
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ public void next() {
+ vertexId = configuration.createVertexId();
+ message = configuration.createMessageValue();
+ try {
+ vertexId.readFields(extendedDataInput);
+ message.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+
+ /**
+ * Get the current vertex id
+ *
+ * @return Current vertex id
+ */
+ public I getCurrentVertexId() {
+ return vertexId;
+ }
+
+ /**
+ * Get the current message
+ *
+ * @return Current message
+ */
+ public M getCurrentMessage() {
+ return message;
+ }
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special input that reads from a DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferInputStream implements DataInput {
+ /** Internal dynamic channel buffer */
+ private DynamicChannelBuffer buffer;
+
+ /**
+ * Constructor.
+ *
+ * @param buffer Buffer to read from
+ */
+ public DynamicChannelBufferInputStream(DynamicChannelBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ buffer.readBytes(b);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ buffer.readBytes(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ buffer.skipBytes(n);
+ return n;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ int ch = buffer.readByte();
+ if (ch < 0) {
+ throw new IllegalStateException("readBoolean: Got " + ch);
+ }
+ return ch != 0;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return buffer.readByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return buffer.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return buffer.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return buffer.readUnsignedShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return buffer.readChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return buffer.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return buffer.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return buffer.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return buffer.readDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ char[] buf = new char[128];
+
+ int room = buf.length;
+ int offset = 0;
+ int c;
+
+ loop:
+ while (true) {
+ c = buffer.readByte();
+ switch (c) {
+ case -1:
+ case '\n':
+ break loop;
+ case '\r':
+ int c2 = buffer.readByte();
+ if ((c2 != '\n') && (c2 != -1)) {
+ buffer.readerIndex(buffer.readerIndex() - 1);
+ }
+ break loop;
+ default:
+ if (--room < 0) {
+ char[] replacebuf = new char[offset + 128];
+ room = replacebuf.length - offset - 1;
+ System.arraycopy(buf, 0, replacebuf, 0, offset);
+ buf = replacebuf;
+ }
+ buf[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(buf, 0, offset);
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ int utflen = buffer.readUnsignedShort();
+
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ buffer.readBytes(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx*/
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+ (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+ ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DirectChannelBufferFactory;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special output stream that can grow as needed and dumps to a
+ * DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferOutputStream implements DataOutput {
+ /** Internal dynamic channel buffer */
+ private DynamicChannelBuffer buffer;
+
+ /**
+ * Constructor
+ *
+ * @param estimatedLength Estimated length of the buffer
+ */
+ public DynamicChannelBufferOutputStream(int estimatedLength) {
+ buffer = (DynamicChannelBuffer)
+ ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN,
+ estimatedLength, DirectChannelBufferFactory.getInstance());
+ }
+
+ /**
+ * Constructor with the buffer to use
+ *
+ * @param buffer Buffer to be written to (cleared before use)
+ */
+ public DynamicChannelBufferOutputStream(DynamicChannelBuffer buffer) {
+ this.buffer = buffer;
+ buffer.clear();
+ }
+
+ /**
+ * Get the dynamic channel buffer
+ *
+ * @return dynamic channel buffer (not a copy)
+ */
+ public DynamicChannelBuffer getDynamicChannelBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.writeByte(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ buffer.writeBytes(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ buffer.writeBytes(b, off, len);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ buffer.writeByte(v ? 1 : 0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ buffer.writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ buffer.writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ buffer.writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ buffer.writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ buffer.writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ buffer.writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ buffer.writeDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ buffer.writeByte((byte) s.charAt(i));
+ }
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ buffer.writeByte((v >>> 8) & 0xFF);
+ buffer.writeByte((v >>> 0) & 0xFF);
+ }
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int strlen = s.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ buffer.writeByte((byte) ((utflen >>> 8) & 0xFF));
+ buffer.writeByte((byte) ((utflen >>> 0) & 0xFF));
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buffer.writeByte((byte) c);
+ }
+
+ for (; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buffer.writeByte((byte) c);
+
+ } else if (c > 0x07FF) {
+ buffer.writeByte((byte) (0xE0 | ((c >> 12) & 0x0F)));
+ buffer.writeByte((byte) (0x80 | ((c >> 6) & 0x3F)));
+ buffer.writeByte((byte) (0x80 | ((c >> 0) & 0x3F)));
+ } else {
+ buffer.writeByte((byte) (0xC0 | ((c >> 6) & 0x1F)));
+ buffer.writeByte((byte) (0x80 | ((c >> 0) & 0x3F)));
+ }
+ }
+ }
+}
+
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Provides access to a internals of ByteArrayInputStream
+ */
+public class ExtendedByteArrayDataInput extends ByteArrayInputStream
+ implements ExtendedDataInput {
+ /** Internal data input */
+ private final DataInput dataInput;
+ /**
+ * Constructor
+ *
+ * @param buf Buffer to read
+ */
+ public ExtendedByteArrayDataInput(byte[] buf) {
+ super(buf);
+ dataInput = new DataInputStream(this);
+ }
+
+ /**
+ * Get access to portion of a byte array
+ *
+ * @param buf Byte array to access
+ * @param offset Offset into the byte array
+ * @param length Length to read
+ */
+ public ExtendedByteArrayDataInput(byte[] buf, int offset, int length) {
+ super(buf, offset, length);
+ dataInput = new DataInputStream(this);
+ }
+
+ @Override
+ public int getPos() {
+ return pos;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ dataInput.readFully(b);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ dataInput.readFully(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ return dataInput.skipBytes(n);
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return dataInput.readBoolean();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return dataInput.readByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return dataInput.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return dataInput.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return dataInput.readUnsignedShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return dataInput.readChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return dataInput.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return dataInput.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return dataInput.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return dataInput.readDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ return dataInput.readLine();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ return dataInput.readUTF();
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Adds some functionality to ByteArrayOutputStream,
+ * such as an option to write int value over previously written data
+ * and directly get the byte array.
+ */
+public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
+ implements ExtendedDataOutput {
+ /** Default number of bytes */
+ private static final int DEFAULT_BYTES = 32;
+ /** Internal data output */
+ private final DataOutput dataOutput;
+
+ /**
+ * Uses the byte array provided or if null, use a default size
+ *
+ * @param buf Buffer to use
+ */
+ public ExtendedByteArrayDataOutput(byte[] buf) {
+ if (buf == null) {
+ this.buf = new byte[DEFAULT_BYTES];
+ } else {
+ this.buf = buf;
+ }
+ dataOutput = new DataOutputStream(this);
+ }
+
+ /**
+ * Uses the byte array provided at the given pos
+ *
+ * @param buf Buffer to use
+ * @param pos Position in the buffer to start writing from
+ */
+ public ExtendedByteArrayDataOutput(byte[] buf, int pos) {
+ this.buf = buf;
+ this.count = pos;
+ dataOutput = new DataOutputStream(this);
+ }
+
+ /**
+ * Creates a new byte array output stream. The buffer capacity is
+ * initially 32 bytes, though its size increases if necessary.
+ */
+ public ExtendedByteArrayDataOutput() {
+ this(DEFAULT_BYTES);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of
+ * the specified size, in bytes.
+ *
+ * @param size the initial size.
+ * @exception IllegalArgumentException if size is negative.
+ */
+ public ExtendedByteArrayDataOutput(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("Negative initial size: " +
+ size);
+ }
+ buf = new byte[size];
+ dataOutput = new DataOutputStream(this);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ dataOutput.writeBoolean(v);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ dataOutput.writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ dataOutput.writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ dataOutput.writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ dataOutput.writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ dataOutput.writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ dataOutput.writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ dataOutput.writeDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ dataOutput.writeBytes(s);
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ dataOutput.writeChars(s);
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ dataOutput.writeUTF(s);
+ }
+
+ @Override
+ public void writeInt(int position, int value) {
+ if (position + 4 > count) {
+ throw new IndexOutOfBoundsException(
+ "writeIntOnPosition: Tried to write int to position " + position +
+ " but current length is " + count);
+ }
+ buf[position] = (byte) ((value >>> 24) & 0xFF);
+ buf[position + 1] = (byte) ((value >>> 16) & 0xFF);
+ buf[position + 2] = (byte) ((value >>> 8) & 0xFF);
+ buf[position + 3] = (byte) ((value >>> 0) & 0xFF);
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return buf;
+ }
+
+ @Override
+ public int getPos() {
+ return count;
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+
+/**
+ * Add some functionality to data input
+ */
+public interface ExtendedDataInput extends DataInput {
+ /**
+ * Get the position of what has been read
+ *
+ * @return How many bytes have been read?
+ */
+ int getPos();
+
+ /**
+ * How many bytes are available?
+ *
+ * @return Bytes available
+ */
+ int available();
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+ /**
+ * In order to write a size as a first part of an data output, it is
+ * useful to be able to write an int at an arbitrary location in the stream
+ *
+ * @param pos Byte position in the output stream
+ * @param value Value to write
+ */
+ void writeInt(int pos, int value);
+
+ /**
+ * Get the position in the output stream
+ *
+ * @return Position in the output stream
+ */
+ int getPos();
+
+ /**
+ * Get the internal byte array (if possible), read-only
+ *
+ * @return Internal byte array (do not modify)
+ */
+ byte[] getByteArray();
+
+ /**
+ * Copies the internal byte array
+ *
+ * @return Copied byte array
+ */
+ byte[] toByteArray();
+
+ /**
+ * Clears the buffer
+ */
+ void reset();
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+public class UnsafeByteArrayInputStream implements ExtendedDataInput {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ UnsafeByteArrayInputStream.class);
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
+ "get unsafe", e);
+ }
+ }
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(byte[].class);
+ /** Offset of a long array */
+ private static final long LONG_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(long[].class);
+ /** Offset of a double array */
+ private static final long DOUBLE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(double[].class);
+
+ /** Byte buffer */
+ private final byte[] buf;
+ /** Buffer length */
+ private final int bufLength;
+ /** Position in the buffer */
+ private int pos = 0;
+
+ /**
+ * Constructor
+ *
+ * @param buf Buffer to read from
+ */
+ public UnsafeByteArrayInputStream(byte[] buf) {
+ this.buf = buf;
+ this.bufLength = buf.length;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer to read from
+ * @param offset Offsetin the buffer to start reading from
+ * @param length Max length of the buffer to read
+ */
+ public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.bufLength = length;
+ }
+
+ /**
+ * How many bytes are still available?
+ *
+ * @return Number of bytes available
+ */
+ public int available() {
+ return bufLength - pos;
+ }
+
+ /**
+ * What position in the stream?
+ *
+ * @return Position
+ */
+ public int getPos() {
+ return pos;
+ }
+
+ /**
+ * Check whether there are enough remaining bytes for an operation
+ *
+ * @param requiredBytes Bytes required to read
+ * @throws IOException When there are not enough bytes to read
+ */
+ private void ensureRemaining(int requiredBytes) throws IOException {
+ if (bufLength - pos < requiredBytes) {
+ throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
+ " bytes remaining, trying to read " + requiredBytes);
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ ensureRemaining(b.length);
+ System.arraycopy(buf, pos, b, 0, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ ensureRemaining(len);
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ ensureRemaining(n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
+ boolean value = UNSAFE.getBoolean(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
+ return value;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
+ byte value = UNSAFE.getByte(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return (short) (readByte() & 0xFF);
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
+ short value = UNSAFE.getShort(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
+ char value = UNSAFE.getChar(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
+ return value;
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
+ int value = UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
+ return value;
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
+ long value = UNSAFE.getLong(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
+ return value;
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
+ float value = UNSAFE.getFloat(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
+ return value;
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
+ double value = UNSAFE.getDouble(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
+ return value;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ char[] tmpBuf = new char[128];
+
+ int room = tmpBuf.length;
+ int offset = 0;
+ int c;
+
+ loop:
+ while (true) {
+ c = readByte();
+ switch (c) {
+ case -1:
+ case '\n':
+ break loop;
+ case '\r':
+ int c2 = readByte();
+ if ((c2 != '\n') && (c2 != -1)) {
+ pos -= 1;
+ }
+ break loop;
+ default:
+ if (--room < 0) {
+ char[] replacebuf = new char[offset + 128];
+ room = replacebuf.length - offset - 1;
+ System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+ tmpBuf = replacebuf;
+ }
+ tmpBuf[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(tmpBuf, 0, offset);
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ int utflen = readUnsignedShort();
+
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx*/
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+ (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+ ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+
+ /**
+ * Get an int at an arbitrary position in a byte[]
+ *
+ * @param buf Buffer to get the int from
+ * @param pos Position in the buffer to get the int from
+ * @return Int at the buffer position
+ */
+ public static int getInt(byte[] buf, int pos) {
+ return UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+public class UnsafeByteArrayOutputStream extends OutputStream
+ implements ExtendedDataOutput {
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
+ "get unsafe", e);
+ }
+ }
+
+ /** Bytes used in a boolean */
+ public static final int SIZE_OF_BOOLEAN = 1;
+ /** Bytes used in a byte */
+ public static final int SIZE_OF_BYTE = 1;
+ /** Bytes used in a char */
+ public static final int SIZE_OF_CHAR = 2;
+ /** Bytes used in a short */
+ public static final int SIZE_OF_SHORT = 2;
+ /** Bytes used in a medium */
+ public static final int SIZE_OF_MEDIUM = 3;
+ /** Bytes used in an int */
+ public static final int SIZE_OF_INT = 4;
+ /** Bytes used in a float */
+ public static final int SIZE_OF_FLOAT = 4;
+ /** Bytes used in a long */
+ public static final int SIZE_OF_LONG = 8;
+ /** Bytes used in a double */
+ public static final int SIZE_OF_DOUBLE = 8;
+ /** Default number of bytes */
+ private static final int DEFAULT_BYTES = 32;
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(byte[].class);
+ /** Offset of a long array */
+ private static final long LONG_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(long[].class);
+ /** Offset of a double array */
+ private static final long DOUBLE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(double[].class);
+
+ /** Byte buffer */
+ private byte[] buf;
+ /** Position in the buffer */
+ private int pos = 0;
+
+ /**
+ * Constructor
+ */
+ public UnsafeByteArrayOutputStream() {
+ this(DEFAULT_BYTES);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param size Initial size of the underlying byte array
+ */
+ public UnsafeByteArrayOutputStream(int size) {
+ buf = new byte[size];
+ }
+
+ /**
+ * Constructor to take in a buffer
+ *
+ * @param buf Buffer to start with, or if null, create own buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf) {
+ if (buf == null) {
+ this.buf = new byte[DEFAULT_BYTES];
+ } else {
+ this.buf = buf;
+ }
+ }
+
+ /**
+ * Constructor to take in a buffer with a given position into that buffer
+ *
+ * @param buf Buffer to start with
+ * @param pos Position to write at the buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
+ this.buf = buf;
+ this.pos = pos;
+ }
+
+ /**
+ * Ensure that this buffer has enough remaining space to add the size.
+ * Creates and copies to a new buffer if necessary
+ *
+ * @param size Size to add
+ */
+ private void ensureSize(int size) {
+ if (pos + size > buf.length) {
+ byte[] newBuf = new byte[(buf.length + size) << 1];
+ System.arraycopy(buf, 0, newBuf, 0, pos);
+ buf = newBuf;
+ }
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return buf;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return Arrays.copyOf(buf, pos);
+
+ }
+
+ @Override
+ public void reset() {
+ pos = 0;
+ }
+
+ @Override
+ public int getPos() {
+ return pos;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureSize(SIZE_OF_BYTE);
+ buf[pos] = (byte) b;
+ pos += SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureSize(b.length);
+ System.arraycopy(b, 0, buf, pos, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureSize(len);
+ System.arraycopy(b, off, buf, pos, len);
+ pos += len;
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ ensureSize(SIZE_OF_BOOLEAN);
+ UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += SIZE_OF_BOOLEAN;
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ ensureSize(SIZE_OF_BYTE);
+ UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
+ pos += SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ ensureSize(SIZE_OF_SHORT);
+ UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
+ pos += SIZE_OF_SHORT;
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ ensureSize(SIZE_OF_CHAR);
+ UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
+ pos += SIZE_OF_CHAR;
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ ensureSize(SIZE_OF_INT);
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += SIZE_OF_INT;
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ if (pos + SIZE_OF_INT > this.pos) {
+ throw new IndexOutOfBoundsException(
+ "writeInt: Tried to write int to position " + pos +
+ " but current length is " + this.pos);
+ }
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ ensureSize(SIZE_OF_LONG);
+ UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += SIZE_OF_LONG;
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ ensureSize(SIZE_OF_FLOAT);
+ UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += SIZE_OF_FLOAT;
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ ensureSize(SIZE_OF_DOUBLE);
+ UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += SIZE_OF_DOUBLE;
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len);
+ for (int i = 0; i < len; i++) {
+ buf[pos++] = (byte) s.charAt(i);
+ }
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len);
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ buf[pos++] = (byte) ((v >>> 8) & 0xFF);
+ buf[pos++] = (byte) ((v >>> 0) & 0xFF);
+ }
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int strlen = s.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ buf[pos++] = (byte) ((utflen >>> 8) & 0xFF);
+ buf[pos++] = (byte) ((utflen >>> 0) & 0xFF);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buf[pos++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buf[pos++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java Thu Nov 15 20:17:38 2012
@@ -26,7 +26,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperExt.PathStat;
import org.apache.hadoop.conf.Configuration;
@@ -110,6 +109,74 @@ public class WritableUtils {
}
/**
+ * Read fields from byteArray to a Writeable object, skipping the size.
+ * Serialization method is choosable
+ *
+ * @param byteArray Byte array to find the fields in.
+ * @param writableObject Object to fill in the fields.
+ * @param unsafe Use unsafe deserialization
+ */
+ public static void readFieldsFromByteArrayWithSize(
+ byte[] byteArray, Writable writableObject, boolean unsafe) {
+ ExtendedDataInput extendedDataInput;
+ if (unsafe) {
+ extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
+ } else {
+ extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
+ }
+ try {
+ extendedDataInput.readInt();
+ writableObject.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "readFieldsFromByteArrayWithSize: IOException", e);
+ }
+ }
+
+ /**
+ * Write object to a byte array with the first 4 bytes as the size of the
+ * entire buffer (including the size).
+ *
+ * @param writableObject Object to write from.
+ * @param unsafe Use unsafe serialization?
+ * @return Byte array with serialized object.
+ */
+ public static byte[] writeToByteArrayWithSize(Writable writableObject,
+ boolean unsafe) {
+ return writeToByteArrayWithSize(writableObject, null, unsafe);
+ }
+
+ /**
+ * Write object to a byte array with the first 4 bytes as the size of the
+ * entire buffer (including the size).
+ *
+ * @param writableObject Object to write from.
+ * @param buffer Use this buffer instead
+ * @param unsafe Use unsafe serialization?
+ * @return Byte array with serialized object.
+ */
+ public static byte[] writeToByteArrayWithSize(Writable writableObject,
+ byte[] buffer,
+ boolean unsafe) {
+ ExtendedDataOutput extendedDataOutput;
+ if (unsafe) {
+ extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
+ } else {
+ extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
+ }
+ try {
+ extendedDataOutput.writeInt(-1);
+ writableObject.write(extendedDataOutput);
+ extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
+ } catch (IOException e) {
+ throw new IllegalStateException("writeToByteArrayWithSize: " +
+ "IOException", e);
+ }
+
+ return extendedDataOutput.getByteArray();
+ }
+
+ /**
* Write object to a ZooKeeper znode.
*
* @param zkExt ZooKeeper instance.
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Thu Nov 15 20:17:38 2012
@@ -18,6 +18,9 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collection;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.NettyClient;
@@ -27,6 +30,7 @@ import org.apache.giraph.comm.requests.S
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.IntWritable;
@@ -38,11 +42,6 @@ import static org.junit.Assert.assertEqu
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.Collection;
-
/**
* Test all the netty failure scenarios
*/
@@ -83,12 +82,15 @@ public class RequestFailureTest {
private WritableRequest getRequest() {
// Data to send
final int partitionId = 0;
- PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+ PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
+ IntWritable>>
dataToSend = new PairList<Integer,
- VertexIdMessageCollection<IntWritable, IntWritable>>();
+ ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
dataToSend.initialize();
- VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
- new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+ ByteArrayVertexIdMessageCollection<IntWritable,
+ IntWritable> vertexIdMessages =
+ new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+ vertexIdMessages.setConf(conf);
vertexIdMessages.initialize();
dataToSend.add(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Thu Nov 15 20:17:38 2012
@@ -18,20 +18,29 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.IntWritable;
@@ -44,16 +53,6 @@ import static org.junit.Assert.assertTru
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Test all the different netty requests.
*/
@@ -108,21 +107,19 @@ public class RequestTest {
public void sendVertexPartition() throws IOException {
// Data to send
int partitionId = 13;
- Collection<Vertex<IntWritable, IntWritable, IntWritable,
- IntWritable>> vertices =
- new ArrayList<Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable>>();
+ Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition =
+ conf.createPartition(partitionId, null);
for (int i = 0; i < 10; ++i) {
TestVertex vertex = new TestVertex();
vertex.initialize(new IntWritable(i), new IntWritable(i));
- vertices.add(vertex);
+ partition.putVertex(vertex);
}
// Send the request
SendVertexRequest<IntWritable, IntWritable, IntWritable,
IntWritable> request =
new SendVertexRequest<IntWritable, IntWritable,
- IntWritable, IntWritable>(partitionId, vertices);
+ IntWritable, IntWritable>(partition);
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
@@ -138,7 +135,7 @@ public class RequestTest {
int total = 0;
for (Vertex<IntWritable, IntWritable,
IntWritable, IntWritable> vertex :
- partitionStore.getPartition(partitionId).getVertices()) {
+ partitionStore.getPartition(partitionId)) {
total += vertex.getId().get();
}
assertEquals(total, 45);
@@ -147,13 +144,16 @@ public class RequestTest {
@Test
public void sendWorkerMessagesRequest() throws IOException {
// Data to send
- PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+ PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
+ IntWritable>>
dataToSend = new PairList<Integer,
- VertexIdMessageCollection<IntWritable, IntWritable>>();
+ ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
dataToSend.initialize();
int partitionId = 0;
- VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
- new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+ ByteArrayVertexIdMessageCollection<IntWritable,
+ IntWritable> vertexIdMessages =
+ new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+ vertexIdMessages.setConf(conf);
vertexIdMessages.initialize();
dataToSend.add(partitionId, vertexIdMessages);
for (int i = 1; i < 7; ++i) {