You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 17:25:30 UTC
[1/3] GIRAPH-907: refactor giraph code to support multiple
implementations of vertexId data (pavanka)
Repository: giraph
Updated Branches:
refs/heads/trunk e92f2942f -> 535a333b7
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
new file mode 100644
index 0000000..b8f8c8e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores vertex ids and data associated with a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdData<I extends WritableComparable, T>
+ extends ImmutableClassesGiraphConfigurable, Writable {
+ /**
+ * Create a new data object.
+ *
+ * @return Newly-created data object.
+ */
+ T createData();
+
+ /**
+ * Write a data object to an {@link ExtendedDataOutput}.
+ *
+ * @param out {@link ExtendedDataOutput}
+ * @param data Data object to write
+ * @throws IOException
+ */
+ void writeData(ExtendedDataOutput out, T data) throws IOException;
+
+ /**
+ * Read a data object's fields from an {@link ExtendedDataInput}.
+ *
+ * @param in {@link ExtendedDataInput}
+ * @param data Data object to fill in-place
+ * @throws IOException
+ */
+ void readData(ExtendedDataInput in, T data) throws IOException;
+
+ /**
+ * Initialize the inner state. Must be called before {@code add()} is
+ * called.
+ */
+ void initialize();
+
+ /**
+ * Initialize the inner state, with a known size. Must be called before
+ * {@code add()} is called.
+ *
+ * @param expectedSize Number of bytes to be expected
+ */
+ void initialize(int expectedSize);
+
+ /**
+ * Add a vertex id and data pair to the collection.
+ *
+ * @param vertexId Vertex id
+ * @param data Data
+ */
+ void add(I vertexId, T data);
+
+ /**
+ * Add a serialized vertex id and data.
+ *
+ * @param serializedId The bye array which holds the serialized id.
+ * @param idPos The end position of the serialized id in the byte array.
+ * @param data Data
+ */
+ void add(byte[] serializedId, int idPos, T data);
+
+ /**
+ * Get the number of bytes used.
+ *
+ * @return Bytes used
+ */
+ int getSize();
+
+ /**
+ * Get the size of this object in serialized form.
+ *
+ * @return The size (in bytes) of the serialized object
+ */
+ int getSerializedSize();
+
+ /**
+ * Check if the list is empty.
+ *
+ * @return Whether the list is empty
+ */
+ boolean isEmpty();
+
+ /**
+ * Clear the list.
+ */
+ void clear();
+
+ /**
+ * Get an iterator over the pairs.
+ *
+ * @return Iterator
+ */
+ VertexIdDataIterator<I, T> getVertexIdDataIterator();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
new file mode 100644
index 0000000..6aea8ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdDataIterator<I extends WritableComparable, T>
+ extends VertexIdIterator<I> {
+ /**
+ * Get the current data.
+ *
+ * @return Current data
+ */
+ T getCurrentData();
+
+ /**
+ * Release the current data object.
+ *
+ * @return Released data object
+ */
+ T releaseCurrentData();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
new file mode 100644
index 0000000..b9c88ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdgeIterator<I extends WritableComparable,
+ E extends Writable> extends VertexIdDataIterator<I, Edge<I, E>> {
+ /**
+ * Get the current edge.
+ *
+ * @return Current edge
+ */
+ Edge<I, E> getCurrentEdge();
+
+ /**
+ * Release the current edge.
+ *
+ * @return Released edge
+ */
+ Edge<I, E> releaseCurrentEdge();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
new file mode 100644
index 0000000..8f3c03a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores vertex id and out-edges of a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdges<I extends WritableComparable,
+ E extends Writable> extends VertexIdData<I, Edge<I, E>> {
+ /**
+ * Get an iterator over the pairs.
+ *
+ * @return Iterator
+ */
+ VertexIdEdgeIterator<I, E> getVertexIdEdgeIterator();
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
index bad11d6..baaa543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -18,7 +18,6 @@
package org.apache.giraph.utils;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.WritableComparable;
/**
@@ -27,38 +26,18 @@ import org.apache.hadoop.io.WritableComparable;
*
* @param <I> Vertex id
*/
-public abstract class VertexIdIterator<I extends WritableComparable> {
- /** Reader of the serialized edges */
- protected final ExtendedDataInput extendedDataInput;
-
- /** Current vertex id */
- protected I vertexId;
-
- /**
- * Constructor.
- *
- * @param extendedDataOutput Extended data output
- * @param configuration Configuration
- */
- public VertexIdIterator(
- ExtendedDataOutput extendedDataOutput,
- ImmutableClassesGiraphConfiguration<I, ?, ?> configuration) {
- extendedDataInput = configuration.createExtendedDataInput(
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
- }
-
+public interface VertexIdIterator<I extends WritableComparable> {
/**
* Returns true if the iteration has more elements.
*
* @return True if the iteration has more elements.
*/
- public boolean hasNext() {
- return extendedDataInput.available() > 0;
- }
+ boolean hasNext();
+
/**
* Moves to the next element in the iteration.
*/
- public abstract void next();
+ void next();
/**
* Get the current vertex id. Ihis object's contents are only guaranteed
@@ -67,9 +46,8 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
*
* @return Current vertex id
*/
- public I getCurrentVertexId() {
- return vertexId;
- }
+ I getCurrentVertexId();
+
/**
* The backing store of the current vertex id is now released.
* Further calls to getCurrentVertexId () without calling next()
@@ -77,9 +55,6 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
*
* @return Current vertex id that was released
*/
- public I releaseCurrentVertexId() {
- I releasedVertexId = vertexId;
- vertexId = null;
- return releasedVertexId;
- }
+ I releaseCurrentVertexId();
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..194bf69
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected. Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageBytesIterator<I extends WritableComparable,
+ M extends Writable> extends VertexIdDataIterator<I, M> {
+
+ /**
+ * Write the current message to an ExtendedDataOutput object
+ *
+ * @param dataOutput Where the current message will be written to
+ */
+ void writeCurrentMessageBytes(DataOutput dataOutput);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
new file mode 100644
index 0000000..c241cea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageIterator<I extends WritableComparable,
+ M extends Writable> extends VertexIdDataIterator<I, M> {
+ /**
+ * Get the current message.
+ *
+ * @return Current message
+ */
+ M getCurrentMessage();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
new file mode 100644
index 0000000..99e5d71
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * VertexIdMessages
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessages<I extends WritableComparable,
+ M extends Writable> extends VertexIdData<I, M> {
+ /**
+ * Get specialized iterator that will instantiate the vertex id and
+ * message of this object. It will only produce message bytes, not actual
+ * messages and expects a different encoding.
+ *
+ * @return Special iterator that reuses vertex ids (unless released) and
+ * copies message bytes
+ */
+ VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator();
+
+ /**
+ * Get specialized iterator that will instiantiate the vertex id and
+ * message of this object.
+ *
+ * @return Special iterator that reuses vertex ids and messages unless
+ * specified
+ */
+ VertexIdMessageIterator<I, M> getVertexIdMessageIterator();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 236bc88..157a543 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -27,6 +27,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.TestMessageValueFactory;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
@@ -48,6 +49,7 @@ import static org.mockito.Mockito.when;
/**
* Test all the netty failure scenarios
*/
+@SuppressWarnings("unchecked")
public class RequestFailureTest {
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
@@ -75,10 +77,10 @@ public class RequestFailureTest {
private WritableRequest getRequest() {
// Data to send
final int partitionId = 0;
- PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+ PairList<Integer, VertexIdMessages<IntWritable,
IntWritable>>
dataToSend = new PairList<Integer,
- ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+ VertexIdMessages<IntWritable, IntWritable>>();
dataToSend.initialize();
ByteArrayVertexIdMessages<IntWritable,
IntWritable> vertexIdMessages =
@@ -97,6 +99,7 @@ public class RequestFailureTest {
// Send the request
SendWorkerMessagesRequest<IntWritable, IntWritable> request =
new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+ request.setConf(conf);
return request;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index fcdfa5c..32454f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -36,6 +36,7 @@ import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.ByteArrayOneToAllMessages;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
@@ -64,6 +65,7 @@ import static org.mockito.Mockito.when;
/**
* Test all the different netty requests.
*/
+@SuppressWarnings("unchecked")
public class RequestTest {
/** Configuration */
private ImmutableClassesGiraphConfiguration conf;
@@ -141,16 +143,15 @@ public class RequestTest {
@Test
public void sendWorkerMessagesRequest() throws IOException {
// Data to send
- PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+ PairList<Integer, VertexIdMessages<IntWritable,
IntWritable>>
- dataToSend = new PairList<Integer,
- ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+ dataToSend = new PairList<>();
dataToSend.initialize();
int partitionId = 0;
ByteArrayVertexIdMessages<IntWritable,
IntWritable> vertexIdMessages =
- new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
- new TestMessageValueFactory<IntWritable>(IntWritable.class));
+ new ByteArrayVertexIdMessages<>(
+ new TestMessageValueFactory<>(IntWritable.class));
vertexIdMessages.setConf(conf);
vertexIdMessages.initialize();
dataToSend.add(partitionId, vertexIdMessages);
@@ -163,7 +164,9 @@ public class RequestTest {
// Send the request
SendWorkerMessagesRequest<IntWritable, IntWritable> request =
- new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+ new SendWorkerMessagesRequest<>(dataToSend);
+ request.setConf(conf);
+
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
@@ -195,8 +198,8 @@ public class RequestTest {
public void sendWorkerOneToAllMessagesRequest() throws IOException {
// Data to send
ByteArrayOneToAllMessages<IntWritable, IntWritable>
- dataToSend = new ByteArrayOneToAllMessages<
- IntWritable, IntWritable>(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+ dataToSend = new ByteArrayOneToAllMessages<>(new
+ TestMessageValueFactory<>(IntWritable.class));
dataToSend.setConf(conf);
dataToSend.initialize();
ExtendedDataOutput output = conf.createExtendedDataOutput();
@@ -208,7 +211,7 @@ public class RequestTest {
// Send the request
SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
- new SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable>(dataToSend, conf);
+ new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf);
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
@@ -304,4 +307,4 @@ public class RequestTest {
}
assertEquals(55, keySum);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 97e88f8..5d8d478 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -30,10 +30,7 @@ import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.BasicPartitionOwner;
import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.partition.SimplePartition;
-import org.apache.giraph.partition.SimplePartitionStore;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
@@ -196,7 +193,7 @@ public class MockUtils {
ServerData<IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable>(
serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory(
- serviceWorker, conf), context);
+ serviceWorker, conf), context);
// Here we add a partition to simulate the case that there is one partition.
serverData.getPartitionStore().addPartition(new SimplePartition());
return serverData;
[3/3] git commit: updated refs/heads/trunk to 535a333
Posted by pa...@apache.org.
GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/535a333b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/535a333b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/535a333b
Branch: refs/heads/trunk
Commit: 535a333b7776f7a229b76c7656e928d8a21d21bc
Parents: e92f294
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 8 08:25:03 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 8 08:25:03 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 6 +-
.../org/apache/giraph/comm/SendEdgeCache.java | 9 +-
.../apache/giraph/comm/SendMessageCache.java | 17 +-
.../giraph/comm/SendMessageToAllCache.java | 4 +-
.../giraph/comm/SendVertexIdDataCache.java | 20 +-
.../ByteArrayMessagesPerVertexStore.java | 48 ++-
.../giraph/comm/messages/MessageStore.java | 4 +-
.../giraph/comm/messages/MessagesIterable.java | 4 +-
.../comm/messages/OneMessagePerVertexStore.java | 9 +-
.../out_of_core/DiskBackedMessageStore.java | 7 +-
.../primitives/IntByteArrayMessageStore.java | 14 +-
.../primitives/IntFloatMessageStore.java | 10 +-
.../primitives/LongByteArrayMessageStore.java | 14 +-
.../primitives/LongDoubleMessageStore.java | 10 +-
.../giraph/comm/netty/InboundByteCounter.java | 1 -
.../NettyWorkerClientRequestProcessor.java | 8 +-
.../comm/netty/handler/RequestDecoder.java | 33 +-
.../comm/netty/handler/RequestEncoder.java | 59 ++--
.../netty/handler/RequestServerHandler.java | 27 +-
.../comm/netty/handler/ResponseEncoder.java | 27 +-
.../comm/requests/SendWorkerDataRequest.java | 21 +-
.../comm/requests/SendWorkerEdgesRequest.java | 11 +-
.../requests/SendWorkerMessagesRequest.java | 21 +-
.../giraph/comm/requests/WritableRequest.java | 1 +
.../apache/giraph/conf/GiraphConfiguration.java | 2 +-
.../org/apache/giraph/conf/GiraphConstants.java | 2 +-
.../ImmutableClassesGiraphConfiguration.java | 12 +
.../apache/giraph/edge/AbstractEdgeStore.java | 9 +-
.../java/org/apache/giraph/edge/EdgeStore.java | 4 +-
.../org/apache/giraph/edge/SimpleEdgeStore.java | 4 +-
.../giraph/edge/primitives/IntEdgeStore.java | 5 +-
.../giraph/edge/primitives/LongEdgeStore.java | 5 +-
.../giraph/utils/AbstractVertexIdData.java | 122 +++++++
.../apache/giraph/utils/ByteArrayIterable.java | 76 -----
.../apache/giraph/utils/ByteArrayIterator.java | 72 -----
.../giraph/utils/ByteArrayVertexIdData.java | 200 +-----------
.../giraph/utils/ByteArrayVertexIdEdges.java | 37 +--
.../giraph/utils/ByteArrayVertexIdMessages.java | 122 ++-----
.../apache/giraph/utils/ByteStructIterable.java | 61 ++++
.../apache/giraph/utils/ByteStructIterator.java | 72 +++++
.../utils/ByteStructVertexIdDataIterator.java | 83 +++++
.../utils/ByteStructVertexIdEdgeIterator.java | 56 ++++
.../utils/ByteStructVertexIdIterator.java | 71 +++++
.../ByteStructVertexIdMessageBytesIterator.java | 80 +++++
.../ByteStructVertexIdMessageIterator.java | 49 +++
.../java/org/apache/giraph/utils/ByteUtils.java | 47 +++
.../utils/ExtendedByteArrayDataInput.java | 2 +-
.../utils/ExtendedByteArrayDataOutput.java | 20 +-
.../apache/giraph/utils/ExtendedDataOutput.java | 18 ++
.../utils/RepresentativeByteArrayIterable.java | 68 ----
.../utils/RepresentativeByteArrayIterator.java | 52 ---
.../utils/RepresentativeByteStructIterable.java | 52 +++
.../utils/RepresentativeByteStructIterator.java | 52 +++
.../org/apache/giraph/utils/RequestUtils.java | 55 ++++
.../apache/giraph/utils/UnsafeArrayReads.java | 200 ++++++++++++
.../utils/UnsafeByteArrayInputStream.java | 319 +------------------
.../utils/UnsafeByteArrayOutputStream.java | 54 ++--
.../org/apache/giraph/utils/UnsafeReads.java | 209 ++++++++++++
.../utils/VerboseByteArrayMessageWrite.java | 60 ----
.../utils/VerboseByteStructMessageWrite.java | 72 +++++
.../org/apache/giraph/utils/VertexIdData.java | 123 +++++++
.../giraph/utils/VertexIdDataIterator.java | 49 +++
.../giraph/utils/VertexIdEdgeIterator.java | 47 +++
.../org/apache/giraph/utils/VertexIdEdges.java | 40 +++
.../apache/giraph/utils/VertexIdIterator.java | 41 +--
.../utils/VertexIdMessageBytesIterator.java | 47 +++
.../giraph/utils/VertexIdMessageIterator.java | 39 +++
.../apache/giraph/utils/VertexIdMessages.java | 50 +++
.../apache/giraph/comm/RequestFailureTest.java | 7 +-
.../org/apache/giraph/comm/RequestTest.java | 23 +-
.../java/org/apache/giraph/utils/MockUtils.java | 5 +-
71 files changed, 2020 insertions(+), 1260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 11e75bb..a0e94c1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,9 +1,11 @@
Giraph Change Log
Release 1.1.0 - unreleased
- GIRAPH-899: Remove hcatalog from hadoop_facebook profile
+ GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)
- GIRAPH-873: Specialized edge stores
+ GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka)
+
+ GIRAPH-873: Specialized edge stores (pavanka)
GIRAPH-898: Remove giraph-accumulo from Facebook profile (edunov via majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index 8350a55..23d3b8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -38,7 +39,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
* @param <E> Edge value
*/
public class SendEdgeCache<I extends WritableComparable, E extends Writable>
- extends SendVertexIdDataCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
+ extends SendVertexIdDataCache<I, Edge<I, E>, VertexIdEdges<I, E>> {
/**
* Constructor
*
@@ -52,7 +53,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
}
@Override
- public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+ public VertexIdEdges<I, E> createVertexIdData() {
return new ByteArrayVertexIdEdges<I, E>();
}
@@ -78,7 +79,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
* @return List of pairs (partitionId, ByteArrayVertexIdEdges),
* where all partition ids belong to workerInfo
*/
- public PairList<Integer, ByteArrayVertexIdEdges<I, E>>
+ public PairList<Integer, VertexIdEdges<I, E>>
removeWorkerEdges(WorkerInfo workerInfo) {
return removeWorkerData(workerInfo);
}
@@ -88,7 +89,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
*
* @return All vertex edges for all partitions
*/
- public PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdEdges<I, E>>>
+ public PairList<WorkerInfo, PairList<Integer, VertexIdEdges<I, E>>>
removeAllEdges() {
return removeAllData();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 24848db..b1fec01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -28,6 +28,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
@@ -45,8 +46,9 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
* @param <I> Vertex id
* @param <M> Message data
*/
+@SuppressWarnings("unchecked")
public class SendMessageCache<I extends WritableComparable, M extends Writable>
- extends SendVertexIdDataCache<I, M, ByteArrayVertexIdMessages<I, M>> {
+ extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendMessageCache.class);
@@ -58,7 +60,6 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
protected final int maxMessagesSizePerWorker;
/** NettyWorkerClientRequestProcessor for message sending */
protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
-
/**
* Constructor
*
@@ -78,7 +79,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
}
@Override
- public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+ public VertexIdMessages<I, M> createVertexIdData() {
return new ByteArrayVertexIdMessages<I, M>(
getConf().getOutgoingMessageValueFactory());
}
@@ -122,7 +123,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
* @return List of pairs (partitionId, ByteArrayVertexIdMessages),
* where all partition ids belong to workerInfo
*/
- protected PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+ protected PairList<Integer, VertexIdMessages<I, M>>
removeWorkerMessages(WorkerInfo workerInfo) {
return removeWorkerData(workerInfo);
}
@@ -133,7 +134,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
* @return All vertex messages for all partitions
*/
private PairList<WorkerInfo, PairList<
- Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
+ Integer, VertexIdMessages<I, M>>> removeAllMessages() {
return removeAllData();
}
@@ -159,7 +160,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageSize >= maxMessagesSizePerWorker) {
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+ PairList<Integer, VertexIdMessages<I, M>>
workerMessages = removeWorkerMessages(workerInfo);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, M>(workerMessages);
@@ -233,10 +234,10 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
*/
public void flush() {
PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdMessages<I, M>>>
+ VertexIdMessages<I, M>>>
remainingMessageCache = removeAllMessages();
PairList<WorkerInfo, PairList<
- Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+ Integer, VertexIdMessages<I, M>>>.Iterator
iterator = remainingMessageCache.getIterator();
while (iterator.hasNext()) {
iterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
index 54234c5..60858ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
@@ -30,7 +30,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
@@ -249,7 +249,7 @@ public class SendMessageToAllCache<I extends WritableComparable,
}
++totalMsgsSentInSuperstep;
if (workerMessageSize >= maxMessagesSizePerWorker) {
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+ PairList<Integer, VertexIdMessages<I, M>>
workerMessages = removeWorkerMessages(workerInfoList[i]);
writableRequest =
new SendWorkerMessagesRequest<I, M>(workerMessages);
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
index afce3ba..19aa991 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.VertexIdData;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.WritableComparable;
@@ -32,12 +32,12 @@ import javax.annotation.concurrent.NotThreadSafe;
*
* @param <I> Vertex id
* @param <T> Data
- * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ * @param <B> Specialization of {@link VertexIdData} for T
*/
@NotThreadSafe
@SuppressWarnings("unchecked")
public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
- B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<B> {
+ B extends VertexIdData<I, T>> extends SendDataCache<B> {
/**
* Constructor.
*
@@ -55,11 +55,11 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
}
/**
- * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+ * Create a new {@link VertexIdData} specialized for the use case.
*
- * @return A new instance of {@link ByteArrayVertexIdData}
+ * @return A new instance of {@link VertexIdData}
*/
- public abstract B createByteArrayVertexIdData();
+ public abstract B createVertexIdData();
/**
* Add data to the cache.
@@ -73,7 +73,7 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
public int addData(WorkerInfo workerInfo,
int partitionId, I destVertexId, T data) {
// Get the data collection
- ByteArrayVertexIdData<I, T> partitionData =
+ VertexIdData<I, T> partitionData =
getPartitionData(workerInfo, partitionId);
int originalSize = partitionData.getSize();
partitionData.add(destVertexId, data);
@@ -97,7 +97,7 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
public int addData(WorkerInfo workerInfo, int partitionId,
byte[] serializedId, int idPos, T data) {
// Get the data collection
- ByteArrayVertexIdData<I, T> partitionData =
+ VertexIdData<I, T> partitionData =
getPartitionData(workerInfo, partitionId);
int originalSize = partitionData.getSize();
partitionData.add(serializedId, idPos, data);
@@ -114,12 +114,12 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
* @param partitionId The remote Partition this message belongs to
* @return The partition data in data cache
*/
- private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
+ private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
int partitionId) {
// Get the data collection
B partitionData = getData(partitionId);
if (partitionData == null) {
- partitionData = createByteArrayVertexIdData();
+ partitionData = createVertexIdData();
partitionData.setConf(getConf());
partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
setData(partitionId, partitionData);
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index e8b3b30..65939bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -26,11 +26,12 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.RepresentativeByteArrayIterator;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.RepresentativeByteStructIterator;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +40,8 @@ import com.google.common.collect.Iterators;
/**
* Implementation of {@link SimpleMessageStore} where multiple messages are
- * stored per vertex as byte arrays. Used when there is no combiner provided.
+ * stored per vertex as byte backed datastructures.
+ * Used when there is no combiner provided.
*
* @param <I> Vertex id
* @param <M> Message data
@@ -88,10 +90,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
int partitionId,
- ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) throws IOException {
ConcurrentMap<I, DataInputOutput> partitionMap =
getOrCreatePartitionMap(partitionId);
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
+ VertexIdMessageBytesIterator<I, M>
vertexIdMessageBytesIterator =
messages.getVertexIdMessageBytesIterator();
// Try to copy the message buffer over rather than
@@ -111,7 +113,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
}
} else {
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ VertexIdMessageIterator<I, M>
vertexIdMessageIterator = messages.getVertexIdMessageIterator();
while (vertexIdMessageIterator.hasNext()) {
vertexIdMessageIterator.next();
@@ -119,7 +121,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
getDataInputOutput(partitionMap, vertexIdMessageIterator);
synchronized (dataInputOutput) {
- VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
vertexIdMessageIterator, dataInputOutput.getDataOutput());
}
}
@@ -132,33 +134,19 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
}
- /**
- * Special iterator only for counting messages
- */
- private class RepresentativeMessageIterator extends
- RepresentativeByteArrayIterator<M> {
- /**
- * Constructor
- *
- * @param dataInput DataInput containing the messages
- */
- public RepresentativeMessageIterator(ExtendedDataInput dataInput) {
- super(dataInput);
- }
-
- @Override
- protected M createWritable() {
- return messageValueFactory.newInstance();
- }
- }
-
@Override
protected int getNumberOfMessagesIn(
ConcurrentMap<I, DataInputOutput> partitionMap) {
int numberOfMessages = 0;
for (DataInputOutput dataInputOutput : partitionMap.values()) {
numberOfMessages += Iterators.size(
- new RepresentativeMessageIterator(dataInputOutput.createDataInput()));
+ new RepresentativeByteStructIterator<M>(
+ dataInputOutput.createDataInput()) {
+ @Override
+ protected M createWritable() {
+ return messageValueFactory.newInstance();
+ }
+ });
}
return numberOfMessages;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 2af7642..7d0bbc6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm.messages;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -75,7 +75,7 @@ public interface MessageStore<I extends WritableComparable,
* @throws IOException
*/
void addPartitionMessages(
- int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+ int partitionId, VertexIdMessages<I, M> messages)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
index 3b22ab3..abbac0b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm.messages;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.Factory;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.giraph.utils.RepresentativeByteStructIterable;
import org.apache.hadoop.io.Writable;
/**
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.Writable;
* @param <M> Message data
*/
public class MessagesIterable<M extends Writable>
- extends RepresentativeByteArrayIterable<M> {
+ extends RepresentativeByteStructIterable<M> {
/** Message class */
private final MessageValueFactory<M> messageValueFactory;
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index bb581c0..9bede06 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -28,7 +28,8 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -64,11 +65,11 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
int partitionId,
- ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) throws IOException {
ConcurrentMap<I, M> partitionMap =
getOrCreatePartitionMap(partitionId);
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
- vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
// This loop is a little complicated as it is optimized to only create
// the minimal amount of vertex id and message objects as possible.
while (vertexIdMessageIterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 1a76306..18b7798 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -24,8 +24,9 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
@@ -84,10 +85,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
int partitionId,
- ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessages<I, M> messages) throws IOException {
PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
getMessageStore(partitionId);
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+ VertexIdMessageIterator<I, M>
vertexIdMessageIterator =
messages.getVertexIdMessageIterator();
while (vertexIdMessageIterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index cc14c6d..dbc1ce8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -24,9 +24,11 @@ import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
@@ -123,12 +125,12 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public void addPartitionMessages(int partitionId,
- ByteArrayVertexIdMessages<IntWritable, M> messages) throws
+ VertexIdMessages<IntWritable, M> messages) throws
IOException {
Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
synchronized (partitionMap) {
- ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageBytesIterator
+ VertexIdMessageBytesIterator<IntWritable, M>
vertexIdMessageBytesIterator =
messages.getVertexIdMessageBytesIterator();
// Try to copy the message buffer over rather than
@@ -145,13 +147,13 @@ public class IntByteArrayMessageStore<M extends Writable>
dataInputOutput.getDataOutput());
}
} else {
- ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageIterator
+ VertexIdMessageIterator<IntWritable, M>
iterator = messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
iterator.getCurrentVertexId().get());
- VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(iterator,
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
dataInputOutput.getDataOutput());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 3318610..be75ee8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -22,9 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.partition.Partition;
-
import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
@@ -97,7 +98,7 @@ public class IntFloatMessageStore
@Override
public void addPartitionMessages(int partitionId,
- ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages) throws
+ VertexIdMessages<IntWritable, FloatWritable> messages) throws
IOException {
IntWritable reusableVertexId = new IntWritable();
FloatWritable reusableMessage = new FloatWritable();
@@ -105,8 +106,7 @@ public class IntFloatMessageStore
Int2FloatOpenHashMap partitionMap = map.get(partitionId);
synchronized (partitionMap) {
- ByteArrayVertexIdMessages<IntWritable,
- FloatWritable>.VertexIdMessageIterator
+ VertexIdMessageIterator<IntWritable, FloatWritable>
iterator = messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 9e4325f..3110864 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -24,9 +24,11 @@ import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -124,12 +126,12 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public void addPartitionMessages(int partitionId,
- ByteArrayVertexIdMessages<LongWritable, M> messages) throws
+ VertexIdMessages<LongWritable, M> messages) throws
IOException {
Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
synchronized (partitionMap) {
- ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageBytesIterator
+ VertexIdMessageBytesIterator<LongWritable, M>
vertexIdMessageBytesIterator =
messages.getVertexIdMessageBytesIterator();
// Try to copy the message buffer over rather than
@@ -146,13 +148,13 @@ public class LongByteArrayMessageStore<M extends Writable>
dataInputOutput.getDataOutput());
}
} else {
- ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageIterator
+ VertexIdMessageIterator<LongWritable, M>
iterator = messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
iterator.getCurrentVertexId().get());
- VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(iterator,
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
dataInputOutput.getDataOutput());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 76d9ffa..264e65a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -22,7 +22,8 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
@@ -94,7 +95,7 @@ public class LongDoubleMessageStore
@Override
public void addPartitionMessages(int partitionId,
- ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages) throws
+ VertexIdMessages<LongWritable, DoubleWritable> messages) throws
IOException {
LongWritable reusableVertexId = new LongWritable();
DoubleWritable reusableMessage = new DoubleWritable();
@@ -102,9 +103,8 @@ public class LongDoubleMessageStore
Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
synchronized (partitionMap) {
- ByteArrayVertexIdMessages<LongWritable,
- DoubleWritable>.VertexIdMessageIterator
- iterator = messages.getVertexIdMessageIterator();
+ VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
+ messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
long vertexId = iterator.getCurrentVertexId().get();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
index bcc888d..44b9c5d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
@@ -70,7 +70,6 @@ public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
"size = " + receivedBytes + ", total bytes = " +
getBytesReceived());
}
-
}
ctx.fireChannelRead(msg);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 43c01ce..ef3f824 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -48,10 +48,10 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -302,7 +302,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
// Send a request if the cache of outgoing edges to the remote worker is
// full
if (workerEdgesSize >= maxEdgesSizePerWorker) {
- PairList<Integer, ByteArrayVertexIdEdges<I, E>> workerEdges =
+ PairList<Integer, VertexIdEdges<I, E>> workerEdges =
sendEdgeCache.removeWorkerEdges(workerInfo);
WritableRequest writableRequest =
new SendWorkerEdgesRequest<I, E>(workerEdges);
@@ -414,10 +414,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
// Execute the remaining sends edges (if any)
PairList<WorkerInfo, PairList<Integer,
- ByteArrayVertexIdEdges<I, E>>>
+ VertexIdEdges<I, E>>>
remainingEdgeCache = sendEdgeCache.removeAllEdges();
PairList<WorkerInfo,
- PairList<Integer, ByteArrayVertexIdEdges<I, E>>>.Iterator
+ PairList<Integer, VertexIdEdges<I, E>>>.Iterator
edgeIterator = remainingEdgeCache.getIterator();
while (edgeIterator.hasNext()) {
edgeIterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
index 98a61e6..8659e95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
@@ -22,14 +22,14 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.InboundByteCounter;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RequestUtils;
import org.apache.log4j.Logger;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
@@ -56,7 +56,7 @@ public class RequestDecoder extends ChannelInboundHandlerAdapter {
* @param byteCounter Keeps track of the decoded bytes
*/
public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
- InboundByteCounter byteCounter) {
+ InboundByteCounter byteCounter) {
this.conf = conf;
this.byteCounter = byteCounter;
}
@@ -80,26 +80,23 @@ public class RequestDecoder extends ChannelInboundHandlerAdapter {
}
// Decode the request
- ByteBuf buffer = (ByteBuf) msg;
- ByteBufInputStream inputStream = new ByteBufInputStream(buffer);
- int enumValue = inputStream.readByte();
+ ByteBuf buf = (ByteBuf) msg;
+ int enumValue = buf.readByte();
RequestType type = RequestType.values()[enumValue];
- Class<? extends WritableRequest> writableRequestClass =
- type.getRequestClass();
+ Class<? extends WritableRequest> requestClass = type.getRequestClass();
+ WritableRequest request =
+ ReflectionUtils.newInstance(requestClass, conf);
+ request = RequestUtils.decodeWritableRequest(buf, request);
- WritableRequest writableRequest =
- ReflectionUtils.newInstance(writableRequestClass, conf);
- writableRequest.readFields(inputStream);
if (LOG.isDebugEnabled()) {
- LOG.debug("decode: Client " + writableRequest.getClientId() +
- ", requestId " + writableRequest.getRequestId() +
- ", " + writableRequest.getType() + ", with size " +
- buffer.array().length + " took " +
+ LOG.debug("decode: Client " + request.getClientId() +
+ ", requestId " + request.getRequestId() +
+ ", " + request.getType() + ", with size " +
+ buf.writerIndex() + " took " +
Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
}
- // release bytebuf
- ReferenceCountUtil.release(buffer);
+ ReferenceCountUtil.release(buf);
// fire writableRequest object to upstream handlers
- ctx.fireChannelRead(writableRequest);
+ ctx.fireChannelRead(request);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index d379eda..13fa82f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty.handler;
+import io.netty.buffer.ByteBufOutputStream;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
@@ -27,25 +28,23 @@ import org.apache.giraph.time.Times;
import org.apache.log4j.Logger;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
/**
* Requests have a request type and an encoded request.
*/
public class RequestEncoder extends ChannelOutboundHandlerAdapter {
- /** Time class to use */
- private static final Time TIME = SystemTime.get();
/** Class logger */
private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
- /** Holds the place of the message length until known */
- private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+ /** Time class to use */
+ private static final Time TIME = SystemTime.get();
/** Buffer starting size */
private final int bufferStartingSize;
- /** Whether or not to use direct byte buffers */
- private final boolean useDirectBuffers;
/** Start nanoseconds for the encoding time */
private long startEncodingNanoseconds = -1;
@@ -57,14 +56,11 @@ public class RequestEncoder extends ChannelOutboundHandlerAdapter {
public RequestEncoder(GiraphConfiguration conf) {
bufferStartingSize =
GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
- useDirectBuffers =
- GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS.get(conf);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
- ChannelPromise promise)
- throws Exception {
+ ChannelPromise promise) throws Exception {
if (!(msg instanceof WritableRequest)) {
throw new IllegalArgumentException(
"encode: Got a message of type " + msg.getClass());
@@ -74,40 +70,41 @@ public class RequestEncoder extends ChannelOutboundHandlerAdapter {
if (LOG.isDebugEnabled()) {
startEncodingNanoseconds = TIME.getNanoseconds();
}
+
ByteBuf buf;
- WritableRequest writableRequest = (WritableRequest) msg;
- int requestSize = writableRequest.getSerializedSize();
+ WritableRequest request = (WritableRequest) msg;
+ int requestSize = request.getSerializedSize();
if (requestSize == WritableRequest.UNKNOWN_SIZE) {
buf = ctx.alloc().buffer(bufferStartingSize);
} else {
- requestSize += LENGTH_PLACEHOLDER.length + 1;
- buf = useDirectBuffers ? ctx.alloc().directBuffer(requestSize) :
- ctx.alloc().buffer(requestSize);
+ requestSize += SIZE_OF_INT + SIZE_OF_BYTE;
+ buf = ctx.alloc().buffer(requestSize);
}
- ByteBufOutputStream outputStream =
- new ByteBufOutputStream(buf);
- outputStream.write(LENGTH_PLACEHOLDER);
- outputStream.writeByte(writableRequest.getType().ordinal());
+ ByteBufOutputStream output = new ByteBufOutputStream(buf);
+
+ // This will later be filled with the correct size of serialized request
+ output.writeInt(0);
+ output.writeByte(request.getType().ordinal());
try {
- writableRequest.write(outputStream);
+ request.write(output);
} catch (IndexOutOfBoundsException e) {
- LOG.error("encode: Most likely the size of request was not properly " +
- "specified (this buffer is too small) - see getSerializedSize() in " +
- writableRequest.getType().getRequestClass());
+ LOG.error("write: Most likely the size of request was not properly " +
+ "specified (this buffer is too small) - see getSerializedSize() " +
+ "in " + request.getType().getRequestClass());
throw new IllegalStateException(e);
}
- outputStream.flush();
- outputStream.close();
+ output.flush();
+ output.close();
// Set the correct size at the end
- buf.setInt(0, buf.writerIndex() - 4);
+ buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
if (LOG.isDebugEnabled()) {
- LOG.debug("encode: Client " + writableRequest.getClientId() + ", " +
- "requestId " + writableRequest.getRequestId() +
+ LOG.debug("write: Client " + request.getClientId() + ", " +
+ "requestId " + request.getRequestId() +
", size = " + buf.readableBytes() + ", " +
- writableRequest.getType() + " took " +
+ request.getType() + " took " +
Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
}
- ctx.writeAndFlush(buf, promise);
+ ctx.write(buf, promise);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 601cd2f..b6d0533 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -80,13 +80,13 @@ public abstract class RequestServerHandler<R> extends
LOG.trace("messageReceived: Got " + msg.getClass());
}
- WritableRequest writableRequest = (WritableRequest) msg;
+ WritableRequest request = (WritableRequest) msg;
// Simulate a closed connection on the first request (if desired)
if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
LOG.info("messageReceived: Simulating closing channel on first " +
- "request " + writableRequest.getRequestId() + " from " +
- writableRequest.getClientId());
+ "request " + request.getRequestId() + " from " +
+ request.getClientId());
setAlreadyClosedFirstRequest();
ctx.close();
return;
@@ -95,24 +95,24 @@ public abstract class RequestServerHandler<R> extends
// Only execute this request exactly once
int alreadyDone = 1;
if (workerRequestReservedMap.reserveRequest(
- writableRequest.getClientId(),
- writableRequest.getRequestId())) {
+ request.getClientId(),
+ request.getRequestId())) {
if (LOG.isDebugEnabled()) {
startProcessingNanoseconds = TIME.getNanoseconds();
}
- processRequest((R) writableRequest);
+ processRequest((R) request);
if (LOG.isDebugEnabled()) {
LOG.debug("messageReceived: Processing client " +
- writableRequest.getClientId() + ", " +
- "requestId " + writableRequest.getRequestId() +
- ", " + writableRequest.getType() + " took " +
+ request.getClientId() + ", " +
+ "requestId " + request.getRequestId() +
+ ", " + request.getType() + " took " +
Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
}
alreadyDone = 0;
} else {
LOG.info("messageReceived: Request id " +
- writableRequest.getRequestId() + " from client " +
- writableRequest.getClientId() +
+ request.getRequestId() + " from client " +
+ request.getClientId() +
" was already processed, " +
"not processing again.");
}
@@ -120,9 +120,10 @@ public abstract class RequestServerHandler<R> extends
// Send the response with the request id
ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
buffer.writeInt(myTaskInfo.getTaskId());
- buffer.writeLong(writableRequest.getRequestId());
+ buffer.writeLong(request.getRequestId());
buffer.writeByte(alreadyDone);
- ctx.writeAndFlush(buffer);
+
+ ctx.write(buffer);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
index c0b45fc..4664fbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
@@ -18,15 +18,17 @@
package org.apache.giraph.comm.netty.handler;
+import io.netty.buffer.ByteBufOutputStream;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.log4j.Logger;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
/**
* How a server should respond to a client. Currently only used for
* responding to client's SASL messages, and removed after client
@@ -35,8 +37,6 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
/** Class logger. */
private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
- /** Holds the place of the message length until known. */
- private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override
public void write(ChannelHandlerContext ctx, Object msg,
@@ -52,28 +52,27 @@ public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
" WritableRequest.");
}
@SuppressWarnings("unchecked")
- WritableRequest writableRequest =
- (WritableRequest) msg;
+ WritableRequest writableRequest = (WritableRequest) msg;
+
ByteBuf buf = ctx.alloc().buffer(10);
- ByteBufOutputStream outputStream =
- new ByteBufOutputStream(buf);
+ ByteBufOutputStream output = new ByteBufOutputStream(buf);
if (LOG.isDebugEnabled()) {
LOG.debug("encode: Encoding a message of type " + msg.getClass());
}
- outputStream.write(LENGTH_PLACEHOLDER);
+ // Space is reserved now to be filled later by the serialize request size
+ output.writeInt(0);
// write type of object.
- outputStream.writeByte(writableRequest.getType().ordinal());
-
+ output.writeByte(writableRequest.getType().ordinal());
// write the object itself.
- writableRequest.write(outputStream);
+ writableRequest.write(output);
- outputStream.flush();
- outputStream.close();
+ output.flush();
+ output.close();
// Set the correct size at the end.
- buf.setInt(0, buf.writerIndex() - 4);
+ buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
if (LOG.isDebugEnabled()) {
LOG.debug("encode: Encoding a message of type " + msg.getClass());
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
index 4f80224..ceb5050 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
@@ -18,7 +18,7 @@
package org.apache.giraph.comm.requests;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.VertexIdData;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -33,10 +33,12 @@ import java.io.IOException;
*
* @param <I> Vertex id
* @param <T> Data
- * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ * @param <B> Specialization of
+ * {@link org.apache.giraph.utils.VertexIdData} for T
*/
+@SuppressWarnings("unchecked")
public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
- B extends ByteArrayVertexIdData<I, T>>
+ B extends VertexIdData<I, T>>
extends WritableRequest implements WorkerRequest {
/** Class logger */
private static final Logger LOG =
@@ -56,8 +58,7 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
/**
* Constructor used to send request.
*
- * @param partVertData Map of remote partitions =>
- * ByteArrayVertexIdData
+ * @param partVertData Map of remote partitions => VertexIdData
*/
public SendWorkerDataRequest(
PairList<Integer, B> partVertData) {
@@ -65,11 +66,13 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
}
/**
- * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+ * Create a new {@link org.apache.giraph.utils.VertexIdData}
+ * specialized for the use case.
*
- * @return A new instance of {@link ByteArrayVertexIdData}
+ * @return A new instance of
+ * {@link org.apache.giraph.utils.VertexIdData}
*/
- public abstract B createByteArrayVertexIdData();
+ public abstract B createVertexIdData();
@Override
public void readFieldsRequest(DataInput input) throws IOException {
@@ -78,7 +81,7 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
partitionVertexData.initialize(numPartitions);
while (numPartitions-- > 0) {
final int partitionId = input.readInt();
- B vertexIdData = createByteArrayVertexIdData();
+ B vertexIdData = createVertexIdData();
vertexIdData.setConf(getConf());
vertexIdData.readFields(input);
partitionVertexData.add(partitionId, vertexIdData);
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index 793768a..510743f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -22,6 +22,7 @@ import org.apache.giraph.comm.ServerData;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +36,7 @@ import org.apache.hadoop.io.WritableComparable;
public class SendWorkerEdgesRequest<I extends WritableComparable,
E extends Writable>
extends SendWorkerDataRequest<I, Edge<I, E>,
- ByteArrayVertexIdEdges<I, E>> {
+ VertexIdEdges<I, E>> {
/**
* Constructor used for reflection only
*/
@@ -48,13 +49,13 @@ public class SendWorkerEdgesRequest<I extends WritableComparable,
* ByteArrayVertexIdEdges
*/
public SendWorkerEdgesRequest(
- PairList<Integer, ByteArrayVertexIdEdges<I, E>> partVertEdges) {
+ PairList<Integer, VertexIdEdges<I, E>> partVertEdges) {
this.partitionVertexData = partVertEdges;
}
@Override
- public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
- return new ByteArrayVertexIdEdges<I, E>();
+ public VertexIdEdges<I, E> createVertexIdData() {
+ return new ByteArrayVertexIdEdges<>();
}
@Override
@@ -64,7 +65,7 @@ public class SendWorkerEdgesRequest<I extends WritableComparable,
@Override
public void doRequest(ServerData serverData) {
- PairList<Integer, ByteArrayVertexIdEdges<I, E>>.Iterator
+ PairList<Integer, VertexIdEdges<I, E>>.Iterator
iterator = partitionVertexData.getIterator();
while (iterator.hasNext()) {
iterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 3ac0962..d525164 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -19,6 +19,7 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.utils.VertexIdMessages;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.PairList;
import org.apache.hadoop.io.Writable;
@@ -34,26 +35,26 @@ import java.io.IOException;
*/
@SuppressWarnings("unchecked")
public class SendWorkerMessagesRequest<I extends WritableComparable,
- M extends Writable>
- extends SendWorkerDataRequest<I, M, ByteArrayVertexIdMessages<I, M>> {
- /**
- * Constructor used for reflection only
- */
- public SendWorkerMessagesRequest() { }
+ M extends Writable> extends SendWorkerDataRequest<I, M,
+ VertexIdMessages<I, M>> {
+
+ /** Default constructor */
+ public SendWorkerMessagesRequest() {
+ }
/**
* Constructor used to send request.
*
* @param partVertMsgs Map of remote partitions =>
- * ByteArrayVertexIdMessages
+ * VertexIdMessages
*/
public SendWorkerMessagesRequest(
- PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
+ PairList<Integer, VertexIdMessages<I, M>> partVertMsgs) {
this.partitionVertexData = partVertMsgs;
}
@Override
- public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+ public VertexIdMessages<I, M> createVertexIdData() {
return new ByteArrayVertexIdMessages<I, M>(
getConf().getOutgoingMessageValueFactory());
}
@@ -65,7 +66,7 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
@Override
public void doRequest(ServerData serverData) {
- PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
+ PairList<Integer, VertexIdMessages<I, M>>.Iterator
iterator = partitionVertexData.getIterator();
while (iterator.hasNext()) {
iterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index 181e681..14c8c0d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.requests;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 2862c3e..a32c938 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -847,7 +847,7 @@ public class GiraphConfiguration extends Configuration
if (nettyBufferAllocator == null) {
if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
nettyBufferAllocator = new PooledByteBufAllocator(
- NETTY_USE_DIRECT_MEMORY.get(this));
+ NETTY_USE_DIRECT_MEMORY.get(this));
} else { // Use un-pooled allocator
// Note: Current default settings create un-pooled heap allocator
nettyBufferAllocator = new UnpooledByteBufAllocator(
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 6b36418..611a0dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -791,7 +791,7 @@ public interface GiraphConstants {
"Overrides default partition count calculation if not -1");
/** Vertex key space size for
- * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
+ * {@link org.apache.giraph.partition.SimpleWorkerPartitioner}
*/
String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 95e029d..e9f50f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1023,6 +1023,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create extendedDataInput based on extendedDataOutput
+ *
+ * @param extendedDataOutput extendedDataOutput
+ * @return extendedDataInput
+ */
+ public ExtendedDataInput createExtendedDataInput(
+ ExtendedDataOutput extendedDataOutput) {
+ return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
+ extendedDataOutput.getPos());
+ }
+
+ /**
* Update Computation and MessageCombiner class used
*
* @param superstepClasses SuperstepClasses
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 80e909d..ee53718 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -24,9 +24,10 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
+import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
@@ -146,15 +147,15 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
* @return out-edges for the vertex
*/
protected abstract OutEdges<I, E> getVertexOutEdges(
- ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+ VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
Map<K, OutEdges<I, E>> partitionEdgesIn);
@Override
public void addPartitionEdges(
- int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+ int partitionId, VertexIdEdges<I, E> edges) {
Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
- ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+ VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
edges.getVertexIdEdgeIterator();
while (vertexIdEdgeIterator.hasNext()) {
vertexIdEdgeIterator.next();
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 1150eaf..912e25c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -18,7 +18,7 @@
package org.apache.giraph.edge;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -38,7 +38,7 @@ public interface EdgeStore<I extends WritableComparable,
* @param partitionId Partition id for the incoming edges.
* @param edges Incoming edges
*/
- void addPartitionEdges(int partitionId, ByteArrayVertexIdEdges<I, E> edges);
+ void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges);
/**
* Move all edges from temporary storage to their source vertices.
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
index 6e2a74f..a6a3356 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
@@ -20,7 +20,7 @@ package org.apache.giraph.edge;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
@@ -101,7 +101,7 @@ public class SimpleEdgeStore<I extends WritableComparable,
@Override
protected OutEdges<I, E> getVertexOutEdges(
- ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+ VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
Map<I, OutEdges<I, E>> partitionEdgesIn) {
ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
(ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
index c6b5051..826c685 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
@@ -22,7 +22,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.AbstractEdgeStore;
import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -112,8 +112,7 @@ public class IntEdgeStore<V extends Writable, E extends Writable>
@Override
protected OutEdges<IntWritable, E> getVertexOutEdges(
- ByteArrayVertexIdEdges<IntWritable, E>.VertexIdEdgeIterator
- vertexIdEdgeIterator,
+ VertexIdEdgeIterator<IntWritable, E> vertexIdEdgeIterator,
Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) {
Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
(Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn;
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
index d4c44c7..486410f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
@@ -22,7 +22,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.AbstractEdgeStore;
import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -113,8 +113,7 @@ public class LongEdgeStore<V extends Writable, E extends Writable>
@Override
protected OutEdges<LongWritable, E> getVertexOutEdges(
- ByteArrayVertexIdEdges<LongWritable, E>.VertexIdEdgeIterator
- vertexIdEdgeIterator,
+ VertexIdEdgeIterator<LongWritable, E> vertexIdEdgeIterator,
Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) {
Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
(Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn;
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
new file mode 100644
index 0000000..d669be4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
+/**
+ * Partial implementation of vertexIdData
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@SuppressWarnings("unchecked")
+public abstract class AbstractVertexIdData<I extends WritableComparable, T>
+ implements VertexIdData<I, T> {
+ /** Extended data output */
+ protected ExtendedDataOutput extendedDataOutput;
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
+
+ @Override
+ public void initialize() {
+ extendedDataOutput = getConf().createExtendedDataOutput();
+ }
+
+ @Override
+ public void initialize(int expectedSize) {
+ extendedDataOutput = getConf().createExtendedDataOutput(expectedSize);
+ }
+
+ @Override
+ public void add(I vertexId, T data) {
+ try {
+ vertexId.write(extendedDataOutput);
+ writeData(extendedDataOutput, data);
+ } catch (IOException e) {
+ throw new IllegalStateException("add: IOException", e);
+ }
+ }
+
+ @Override
+ public void add(byte[] serializedId, int idPos, T data) {
+ try {
+ extendedDataOutput.write(serializedId, 0, idPos);
+ writeData(extendedDataOutput, data);
+ } catch (IOException e) {
+ throw new IllegalStateException("add: IOException", e);
+ }
+ }
+
+ @Override
+ public int getSize() {
+ return extendedDataOutput.getPos();
+ }
+
+
+ @Override
+ public int getSerializedSize() {
+ return SIZE_OF_BYTE + SIZE_OF_INT + getSize();
+ }
+
+
+ @Override
+ public boolean isEmpty() {
+ return extendedDataOutput.getPos() == 0;
+ }
+
+
+ @Override
+ public void clear() {
+ extendedDataOutput.reset();
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
+ return configuration;
+ }
+
+ @Override
+ public ByteStructVertexIdDataIterator<I, T> getVertexIdDataIterator() {
+ return new ByteStructVertexIdDataIterator<>(this);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException("not supported");
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
deleted file mode 100644
index d14172e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This iterable is designed to deserialize a byte array on the fly to
- * provide new copies of writable objects when desired. It does not reuse
- * objects, and instead creates a new one for every next().
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class ByteArrayIterable<T extends Writable> implements
- Iterable<T> {
- /** Factory for data input */
- protected final Factory<? extends ExtendedDataInput> dataInputFactory;
-
- /**
- * Constructor
- *
- * @param dataInputFactory Factory for data inputs
- */
- public ByteArrayIterable(
- Factory<? extends ExtendedDataInput> dataInputFactory) {
- this.dataInputFactory = dataInputFactory;
- }
-
- /**
- * Must be able to create the writable object
- *
- * @return New writable
- */
- protected abstract T createWritable();
-
- /**
- * Iterator over the internal byte array
- */
- private class ByteArrayIterableIterator extends ByteArrayIterator<T> {
- /**
- * Constructor.
- *
- * @param dataInputFactory Factory for data input
- */
- private ByteArrayIterableIterator(
- Factory<? extends ExtendedDataInput> dataInputFactory) {
- super(dataInputFactory.create());
- }
-
- @Override
- protected T createWritable() {
- return ByteArrayIterable.this.createWritable();
- }
- }
-
- @Override
- public Iterator<T> iterator() {
- return new ByteArrayIterableIterator(dataInputFactory);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
deleted file mode 100644
index 28b2dc8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This iterator is designed to deserialize a byte array on the fly to
- * provide new copies of writable objects when desired. It does not reuse
- * objects, and instead creates a new one for every next().
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class ByteArrayIterator<T extends Writable> implements
- Iterator<T> {
- /** Data input */
- protected final ExtendedDataInput extendedDataInput;
-
- /**
- * Wrap ExtendedDataInput in ByteArrayIterator
- *
- * @param extendedDataInput ExtendedDataInput
- */
- public ByteArrayIterator(ExtendedDataInput extendedDataInput) {
- this.extendedDataInput = extendedDataInput;
- }
-
- @Override
- public boolean hasNext() {
- return extendedDataInput.available() > 0;
- }
-
- @Override
- public T next() {
- T writable = createWritable();
- try {
- writable.readFields(extendedDataInput);
- } catch (IOException e) {
- throw new IllegalStateException("next: readFields got IOException", e);
- }
- return writable;
- }
-
- @Override
- public void remove() {
- throw new IllegalAccessError("remove: Not supported");
- }
-
- /**
- * Must be able to create the writable object
- *
- * @return New writable
- */
- protected abstract T createWritable();
-}
[2/3] GIRAPH-907: refactor giraph code to support multiple
implementations of vertexId data (pavanka)
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 5c56038..f26a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -18,9 +18,6 @@
package org.apache.giraph.utils;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
@@ -33,122 +30,8 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <T> Data
*/
-public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
- implements Writable, ImmutableClassesGiraphConfigurable {
- /** Extended data output */
- private ExtendedDataOutput extendedDataOutput;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
-
- /**
- * Create a new data object.
- *
- * @return Newly-created data object.
- */
- public abstract T createData();
-
- /**
- * Write a data object to an {@link ExtendedDataOutput}.
- *
- * @param out {@link ExtendedDataOutput}
- * @param data Data object to write
- * @throws IOException
- */
- public abstract void writeData(ExtendedDataOutput out, T data)
- throws IOException;
-
- /**
- * Read a data object's fields from an {@link ExtendedDataInput}.
- *
- * @param in {@link ExtendedDataInput}
- * @param data Data object to fill in-place
- * @throws IOException
- */
- public abstract void readData(ExtendedDataInput in, T data)
- throws IOException;
-
- /**
- * Initialize the inner state. Must be called before {@code add()} is
- * called.
- */
- public void initialize() {
- extendedDataOutput = configuration.createExtendedDataOutput();
- }
-
- /**
- * Initialize the inner state, with a known size. Must be called before
- * {@code add()} is called.
- *
- * @param expectedSize Number of bytes to be expected
- */
- public void initialize(int expectedSize) {
- extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
- }
-
- /**
- * Add a vertex id and data pair to the collection.
- *
- * @param vertexId Vertex id
- * @param data Data
- */
- public void add(I vertexId, T data) {
- try {
- vertexId.write(extendedDataOutput);
- writeData(extendedDataOutput, data);
- } catch (IOException e) {
- throw new IllegalStateException("add: IOException", e);
- }
- }
-
- /**
- * Add a serialized vertex id and data.
- *
- * @param serializedId The bye array which holds the serialized id.
- * @param idPos The end position of the serialized id in the byte array.
- * @param data Data
- */
- public void add(byte[] serializedId, int idPos, T data) {
- try {
- extendedDataOutput.write(serializedId, 0, idPos);
- writeData(extendedDataOutput, data);
- } catch (IOException e) {
- throw new IllegalStateException("add: IOException", e);
- }
- }
-
- /**
- * Get the number of bytes used.
- *
- * @return Bytes used
- */
- public int getSize() {
- return extendedDataOutput.getPos();
- }
-
- /**
- * Get the size of this object in serialized form.
- *
- * @return The size (in bytes) of the serialized object
- */
- public int getSerializedSize() {
- return 1 + 4 + getSize();
- }
-
- /**
- * Check if the list is empty.
- *
- * @return Whether the list is empty
- */
- public boolean isEmpty() {
- return extendedDataOutput.getPos() == 0;
- }
-
- /**
- * Clear the list.
- */
- public void clear() {
- extendedDataOutput.reset();
- }
+public abstract class ByteArrayVertexIdData<I extends WritableComparable,
+ T> extends AbstractVertexIdData<I, T> {
/**
* Get the underlying byte-array.
@@ -160,16 +43,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
}
@Override
- public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
- return configuration;
- }
-
- @Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeExtendedDataOutput(extendedDataOutput, dataOutput);
}
@@ -177,73 +50,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
@Override
public void readFields(DataInput dataInput) throws IOException {
extendedDataOutput =
- WritableUtils.readExtendedDataOutput(dataInput, configuration);
- }
-
- /**
- * Get an iterator over the pairs.
- *
- * @return Iterator
- */
- public VertexIdDataIterator getVertexIdDataIterator() {
- return new VertexIdDataIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and data objects so that the
- * lifetime of the object is only until next() is called.
- *
- * Vertex id ownership can be released if desired through
- * releaseCurrentVertexId(). This optimization allows us to cut down
- * on the number of objects instantiated and garbage collected.
- *
- * Not thread-safe.
- */
- public class VertexIdDataIterator extends VertexIdIterator<I> {
- /** Current data. */
- private T data;
-
- /** Default constructor. */
- public VertexIdDataIterator() {
- super(extendedDataOutput, configuration);
- }
-
- @Override
- public void next() {
- if (vertexId == null) {
- vertexId = configuration.createVertexId();
- }
- if (data == null) {
- data = createData();
- }
- try {
- vertexId.readFields(extendedDataInput);
- readData(extendedDataInput, data);
- } catch (IOException e) {
- throw new IllegalStateException("next: IOException", e);
- }
- }
-
- /**
- * Get the current data.
- *
- * @return Current data
- */
- public T getCurrentData() {
- return data;
- }
-
- /**
- * Release the current data object.
- *
- * @return Released data object
- */
- public T releaseCurrentData() {
- T releasedData = data;
- data = null;
- return releasedData;
- }
+ WritableUtils.readExtendedDataOutput(dataInput, getConf());
}
-
}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 762802b..16370c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -33,7 +33,8 @@ import java.io.IOException;
*/
@SuppressWarnings("unchecked")
public class ByteArrayVertexIdEdges<I extends WritableComparable,
- E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> {
+ E extends Writable> extends ByteArrayVertexIdData<I, Edge<I,
+ E>> implements VertexIdEdges<I, E> {
/**
* Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
* to generate edge objects.
@@ -62,37 +63,9 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
WritableUtils.readEdge(in, edge);
}
- /**
- * Get an iterator over the pairs.
- *
- * @return Iterator
- */
- public VertexIdEdgeIterator getVertexIdEdgeIterator() {
- return new VertexIdEdgeIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and edge objects so that the
- * lifetime of the object is only until next() is called.
- */
- public class VertexIdEdgeIterator extends VertexIdDataIterator {
- /**
- * Get the current edge.
- *
- * @return Current edge
- */
- public Edge<I, E> getCurrentEdge() {
- return getCurrentData();
- }
-
- /**
- * Release the current edge.
- *
- * @return Released edge
- */
- public Edge<I, E> releaseCurrentEdge() {
- return releaseCurrentData();
- }
+ @Override
+ public ByteStructVertexIdEdgeIterator<I, E> getVertexIdEdgeIterator() {
+ return new ByteStructVertexIdEdgeIterator<>(this);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0ac8fdf..b3eca3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.giraph.utils;
import org.apache.giraph.factories.MessageValueFactory;
@@ -33,7 +34,8 @@ import java.io.IOException;
*/
@SuppressWarnings("unchecked")
public class ByteArrayVertexIdMessages<I extends WritableComparable,
- M extends Writable> extends ByteArrayVertexIdData<I, M> {
+ M extends Writable> extends ByteArrayVertexIdData<I, M>
+ implements VertexIdMessages<I, M> {
/** Message value class */
private MessageValueFactory<M> messageValueFactory;
/** Add the message size to the stream? (Depends on the message store) */
@@ -52,7 +54,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
/**
* Set whether message sizes should be encoded. This should only be a
* possibility when not combining. When combining, all messages need to be
- * deserializd right away, so this won't help.
+ * de-serialized right away, so this won't help.
*/
private void setUseMessageSizeEncoding() {
if (!getConf().useMessageCombiner()) {
@@ -89,45 +91,31 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
setUseMessageSizeEncoding();
}
- /**
- * Get specialized iterator that will instiantiate the vertex id and
- * message of this object.
- *
- * @return Special iterator that reuses vertex ids and messages unless
- * specified
- */
- public VertexIdMessageIterator getVertexIdMessageIterator() {
- return new VertexIdMessageIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and message objects so that the
- * lifetime of the object is only until next() is called.
- */
- public class VertexIdMessageIterator extends VertexIdDataIterator {
- /**
- * Get the current message.
- *
- * @return Current message
- */
- public M getCurrentMessage() {
- return getCurrentData();
- }
+ @Override
+ public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+ return new ByteStructVertexIdMessageIterator<>(this);
}
- /**
- * Get specialized iterator that will instiantiate the vertex id and
- * message of this object. It will only produce message bytes, not actual
- * messages and expects a different encoding.
- *
- * @return Special iterator that reuses vertex ids (unless released) and
- * copies message bytes
- */
- public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+ @Override
+ public ByteStructVertexIdMessageBytesIterator<I, M>
+ getVertexIdMessageBytesIterator() {
if (!useMessageSizeEncoding) {
return null;
}
- return new VertexIdMessageBytesIterator();
+ return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
+ @Override
+ public void writeCurrentMessageBytes(DataOutput dataOutput) {
+ try {
+ dataOutput.write(extendedDataOutput.getByteArray(),
+ messageOffset, messageBytes);
+ } catch (NegativeArraySizeException e) {
+ VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
+ } catch (IOException e) {
+ throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+ "IOException", e);
+ }
+ }
+ };
}
@Override
@@ -141,66 +129,4 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
useMessageSizeEncoding = dataInput.readBoolean();
super.readFields(dataInput);
}
-
- /**
- * Special iterator that reuses vertex ids and messages bytes so that the
- * lifetime of the object is only until next() is called.
- *
- * Vertex id ownership can be released if desired through
- * releaseCurrentVertexId(). This optimization allows us to cut down
- * on the number of objects instantiated and garbage collected. Messages
- * can only be copied to an ExtendedDataOutput object
- *
- * Not thread-safe.
- */
- public class VertexIdMessageBytesIterator extends VertexIdDataIterator {
- /** Last message offset */
- private int messageOffset = -1;
- /** Number of bytes in the last message */
- private int messageBytes = -1;
-
- /**
- * Moves to the next element in the iteration.
- */
- @Override
- public void next() {
- if (vertexId == null) {
- vertexId = getConf().createVertexId();
- }
-
- try {
- vertexId.readFields(extendedDataInput);
- messageBytes = extendedDataInput.readInt();
- messageOffset = extendedDataInput.getPos();
- if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
- throw new IllegalStateException("next: Failed to skip " +
- messageBytes);
- }
- } catch (IOException e) {
- throw new IllegalStateException("next: IOException", e);
- }
- }
-
- /**
- * Write the current message to an ExtendedDataOutput object
- *
- * @param dataOutput Where the current message will be written to
- */
- public void writeCurrentMessageBytes(DataOutput dataOutput) {
- try {
- dataOutput.write(getByteArray(), messageOffset, messageBytes);
- } catch (NegativeArraySizeException e) {
- throw new RuntimeException("The numbers of bytes sent to vertex " +
- vertexId + " exceeded the max capacity of " +
- "its ExtendedDataOutput. Please consider setting " +
- "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
- " in the graph which receive a lot of messages (total serialized " +
- "size of messages goes beyond the maximum size of a byte array), " +
- "setting this option to true will remove that limit");
- } catch (IOException e) {
- throw new IllegalStateException("writeCurrentMessageBytes: Got " +
- "IOException", e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
new file mode 100644
index 0000000..4e15c1b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterable is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired. It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterable<T extends Writable> implements
+ Iterable<T> {
+ /** Factory for data input */
+ protected final Factory<? extends ExtendedDataInput> dataInputFactory;
+
+ /**
+ * Constructor
+ *
+ * @param dataInputFactory Factory for data inputs
+ */
+ public ByteStructIterable(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ this.dataInputFactory = dataInputFactory;
+ }
+
+ /**
+ * Must be able to create the writable object
+ *
+ * @return New writable
+ */
+ protected abstract T createWritable();
+
+ @Override
+ public Iterator<T> iterator() {
+ return new ByteStructIterator<T>(dataInputFactory.create()) {
+ @Override
+ protected T createWritable() {
+ return ByteStructIterable.this.createWritable();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
new file mode 100644
index 0000000..322365c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterator is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired. It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterator<T extends Writable> implements
+ Iterator<T> {
+ /** Data input */
+ protected final ExtendedDataInput extendedDataInput;
+
+ /**
+ * Wrap ExtendedDataInput in ByteArrayIterator
+ *
+ * @param extendedDataInput ExtendedDataInput
+ */
+ public ByteStructIterator(ExtendedDataInput extendedDataInput) {
+ this.extendedDataInput = extendedDataInput;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public T next() {
+ T writable = createWritable();
+ try {
+ writable.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: readFields got IOException", e);
+ }
+ return writable;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove: Not supported");
+ }
+
+ /**
+ * Must be able to create the writable object
+ *
+ * @return New writable
+ */
+ protected abstract T createWritable();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
new file mode 100644
index 0000000..cefec0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@NotThreadSafe
+public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
+ extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
+ /** VertexIdData to iterate over */
+ protected AbstractVertexIdData<I, T> vertexIdData;
+ /** Current data. */
+ private T data;
+
+ /**
+ * Constructor
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdDataIterator(
+ AbstractVertexIdData<I, T> vertexIdData) {
+ super(vertexIdData.extendedDataOutput, vertexIdData.getConf());
+ this.vertexIdData = vertexIdData;
+ }
+
+ @Override
+ public void next() {
+ if (vertexId == null) {
+ vertexId = vertexIdData.getConf().createVertexId();
+ }
+ if (data == null) {
+ data = vertexIdData.createData();
+ }
+ try {
+ vertexId.readFields(extendedDataInput);
+ vertexIdData.readData(extendedDataInput, data);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+
+ @Override
+ public T getCurrentData() {
+ return data;
+ }
+
+ @Override
+ public T releaseCurrentData() {
+ T releasedData = data;
+ data = null;
+ return releasedData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
new file mode 100644
index 0000000..7e06038
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public class ByteStructVertexIdEdgeIterator<I extends WritableComparable,
+ E extends Writable> extends ByteStructVertexIdDataIterator<I, Edge<I, E>>
+ implements VertexIdEdgeIterator<I, E> {
+
+ /**
+ * Constructor
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdEdgeIterator(
+ AbstractVertexIdData<I, Edge<I, E>> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() {
+ return getCurrentData();
+ }
+
+
+ @Override
+ public Edge<I, E> releaseCurrentEdge() {
+ return releaseCurrentData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
new file mode 100644
index 0000000..3d564cd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator
+ * and VertexIdMessageBytesIterator.
+ *
+ * @param <I> Vertex id
+ */
+public abstract class ByteStructVertexIdIterator<I extends WritableComparable>
+ implements VertexIdIterator<I> {
+ /** Reader of the serialized edges */
+ protected final ExtendedDataInput extendedDataInput;
+
+ /** Current vertex id */
+ protected I vertexId;
+
+ /**
+ * Constructor.
+ *
+ * @param extendedDataOutput Extended data output
+ * @param conf Configuration
+ */
+ public ByteStructVertexIdIterator(
+ ExtendedDataOutput extendedDataOutput,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ if (extendedDataOutput != null && conf != null) {
+ extendedDataInput = conf.createExtendedDataInput(extendedDataOutput);
+ } else {
+ throw new IllegalStateException("Cannot instantiate vertexIdIterator " +
+ "with null arguments");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public I getCurrentVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public I releaseCurrentVertexId() {
+ I releasedVertexId = vertexId;
+ vertexId = null;
+ return releasedVertexId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..c0a86c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected. Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+@NotThreadSafe
+public abstract class ByteStructVertexIdMessageBytesIterator<I
+ extends WritableComparable, M extends Writable>
+ extends ByteStructVertexIdDataIterator<I, M>
+ implements VertexIdMessageBytesIterator<I, M> {
+ /** Last message offset */
+ protected int messageOffset = -1;
+ /** Number of bytes in the last message */
+ protected int messageBytes = -1;
+
+ /**
+ * Constructor with vertexIdData
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdMessageBytesIterator(
+ AbstractVertexIdData<I, M> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ @Override
+ public void next() {
+ if (vertexId == null) {
+ vertexId = vertexIdData.getConf().createVertexId();
+ }
+
+ try {
+ vertexId.readFields(extendedDataInput);
+ messageBytes = extendedDataInput.readInt();
+ messageOffset = extendedDataInput.getPos();
+ if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+ throw new IllegalStateException("next: Failed to skip " +
+ messageBytes);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
new file mode 100644
index 0000000..b686211
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
+ M extends Writable> extends ByteStructVertexIdDataIterator<I, M>
+ implements VertexIdMessageIterator<I, M> {
+
+ /**
+ * Constructor with vertexIdData
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdMessageIterator(
+ AbstractVertexIdData<I, M> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ @Override
+ public M getCurrentMessage() {
+ return getCurrentData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
new file mode 100644
index 0000000..52d959e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+ /** Bytes used in a boolean */
+ public static final int SIZE_OF_BOOLEAN = 1;
+ /** Bytes used in a byte */
+ public static final int SIZE_OF_BYTE = 1;
+ /** Bytes used in a char */
+ public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+ /** Bytes used in a short */
+ public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+ /** Bytes used in an int */
+ public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+ /** Bytes used in a long */
+ public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+ /** Bytes used in a float */
+ public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+ /** Bytes used in a double */
+ public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+
+ /**
+ * Private Constructor
+ */
+ private ByteUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
index 0ecea77..3eae25b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -26,7 +26,7 @@ import java.io.IOException;
* Provides access to a internals of ByteArrayInputStream
*/
public class ExtendedByteArrayDataInput extends ByteArrayInputStream
- implements ExtendedDataInput {
+ implements ExtendedDataInput {
/** Internal data input */
private final DataInput dataInput;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 0ff366d..9988b15 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -140,10 +140,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
}
@Override
- public void skipBytes(int bytesToSkip) {
- if ((count + bytesToSkip) > buf.length) {
- buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+ public void ensureWritable(int minSize) {
+ if ((count + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + minSize));
}
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
count += bytesToSkip;
}
@@ -161,6 +166,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
}
@Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > count) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+ "Length: %d exceeds the size of buf : %d", offset, length, count));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
+ }
+
+ @Override
public byte[] getByteArray() {
return buf;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index 54ef514..bc979af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -24,6 +24,14 @@ import java.io.DataOutput;
*/
public interface ExtendedDataOutput extends DataOutput {
/**
+ * Ensure that backing byte structure has at least minSize
+ * additional bytes
+ *
+ * @param minSize additional size required
+ */
+ void ensureWritable(int minSize);
+
+ /**
* Skip some number of bytes.
*
* @param bytesToSkip Number of bytes to skip
@@ -61,7 +69,17 @@ public interface ExtendedDataOutput extends DataOutput {
byte[] toByteArray();
/**
+ * Return a copy of slice of byte array
+ *
+ * @param offset offset of array
+ * @param length length of slice
+ * @return byte array
+ */
+ byte[] toByteArray(int offset, int length);
+
+ /**
* Clears the buffer
*/
void reset();
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
deleted file mode 100644
index 2c24e89..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by the iterators generated from this object have
- * lifetimes only until next() is called. In that sense, the object
- * provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterable<T extends Writable>
- extends ByteArrayIterable<T> {
- /**
- * Constructor
- *
- * @param dataInputFactory Factory for data inputs
- */
- public RepresentativeByteArrayIterable(
- Factory<? extends ExtendedDataInput> dataInputFactory) {
- super(dataInputFactory);
- }
-
- /**
- * Iterator over the internal byte array
- */
- private class RepresentativeByteArrayIterableIterator extends
- RepresentativeByteArrayIterator<T> {
- /**
- * Constructor.
- *
- * @param extendedDataInput ExtendedDataInput
- */
- private RepresentativeByteArrayIterableIterator(
- ExtendedDataInput extendedDataInput) {
- super(extendedDataInput);
- }
-
- @Override
- protected T createWritable() {
- return RepresentativeByteArrayIterable.this.createWritable();
- }
- }
-
- @Override
- public Iterator<T> iterator() {
- return
- new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
deleted file mode 100644
index d36c94f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by this iterator have lifetimes only until next() is
- * called. In that sense, the object provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterator<T extends
- Writable> extends ByteArrayIterator<T> {
- /** Representative writable */
- private final T representativeWritable = createWritable();
-
- /**
- * Wrap ExtendedDataInput in ByteArrayIterator
- *
- * @param extendedDataInput ExtendedDataInput
- */
- public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
- super(extendedDataInput);
- }
-
- @Override
- public T next() {
- try {
- representativeWritable.readFields(extendedDataInput);
- } catch (IOException e) {
- throw new IllegalStateException("next: readFields got IOException", e);
- }
- return representativeWritable;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
new file mode 100644
index 0000000..0859010
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by the iterators generated from this object have
+ * lifetimes only until next() is called. In that sense, the object
+ * provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterable<T extends Writable>
+ extends ByteStructIterable<T> {
+ /**
+ * Constructor
+ *
+ * @param dataInputFactory Factory for data inputs
+ */
+ public RepresentativeByteStructIterable(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ super(dataInputFactory);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new RepresentativeByteStructIterator<T>(dataInputFactory.create()) {
+ @Override
+ protected T createWritable() {
+ return RepresentativeByteStructIterable.this.createWritable();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
new file mode 100644
index 0000000..0bf98ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by this iterator have lifetimes only until next() is
+ * called. In that sense, the object provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterator<T extends
+ Writable> extends ByteStructIterator<T> {
+ /** Representative writable */
+ private final T representativeWritable = createWritable();
+
+ /**
+ * Wrap ExtendedDataInput in ByteArrayIterator
+ *
+ * @param extendedDataInput ExtendedDataInput
+ */
+ public RepresentativeByteStructIterator(ExtendedDataInput extendedDataInput) {
+ super(extendedDataInput);
+ }
+
+ @Override
+ public T next() {
+ try {
+ representativeWritable.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: readFields got IOException", e);
+ }
+ return representativeWritable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
new file mode 100644
index 0000000..2c56cb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+import java.io.IOException;
+import org.apache.log4j.Logger;
+
+/**
+ * RequestUtils utility class
+ */
+public class RequestUtils {
+ /** Logger */
+ public static final Logger LOG = Logger.getLogger(RequestUtils.class);
+
+ /**
+ * Private Constructor
+ */
+ private RequestUtils() {
+ }
+
+ /**
+ * decodeWritableRequest based on predicate
+ *
+ * @param buf ByteBuf
+ * @param request writableRequest
+ * @return properly initialized writableRequest
+ * @throws IOException
+ */
+ public static WritableRequest decodeWritableRequest(ByteBuf buf,
+ WritableRequest request) throws IOException {
+ ByteBufInputStream input = new ByteBufInputStream(buf);
+ request.readFields(input);
+ return request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
new file mode 100644
index 0000000..db19fda
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public class UnsafeArrayReads extends UnsafeReads {
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeArrayReads: Failed to " +
+ "get unsafe", e);
+ }
+ }
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Byte buffer */
+ private final byte[] buf;
+
+ /**
+ * Constructor
+ *
+ * @param buf Buffer to read from
+ */
+ public UnsafeArrayReads(byte[] buf) {
+ super(buf.length);
+ this.buf = buf;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer to read from
+ * @param offset Offsetin the buffer to start reading from
+ * @param length Max length of the buffer to read
+ */
+ public UnsafeArrayReads(byte[] buf, int offset, int length) {
+ super(offset, length);
+ this.buf = buf;
+ }
+
+ @Override
+ public int available() {
+ return (int) (bufLength - pos);
+ }
+
+
+ @Override
+ public int getPos() {
+ return (int) pos;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ ensureRemaining(b.length);
+ System.arraycopy(buf, (int) pos, b, 0, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ ensureRemaining(len);
+ System.arraycopy(buf, (int) pos, b, off, len);
+ pos += len;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ ensureRemaining(SIZE_OF_BOOLEAN);
+ boolean value = UNSAFE.getBoolean(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_BOOLEAN;
+ return value;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ ensureRemaining(SIZE_OF_BYTE);
+ byte value = UNSAFE.getByte(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_BYTE;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return (short) (readByte() & 0xFF);
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ ensureRemaining(SIZE_OF_SHORT);
+ short value = UNSAFE.getShort(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_SHORT;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ ensureRemaining(SIZE_OF_CHAR);
+ char value = UNSAFE.getChar(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_CHAR;
+ return value;
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ ensureRemaining(SIZE_OF_INT);
+ int value = UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_INT;
+ return value;
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ ensureRemaining(SIZE_OF_LONG);
+ long value = UNSAFE.getLong(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_LONG;
+ return value;
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ ensureRemaining(SIZE_OF_FLOAT);
+ float value = UNSAFE.getFloat(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_FLOAT;
+ return value;
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ ensureRemaining(SIZE_OF_DOUBLE);
+ double value = UNSAFE.getDouble(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_DOUBLE;
+ return value;
+ }
+
+ /**
+ * Get an int at an arbitrary position in a byte[]
+ *
+ * @param buf Buffer to get the int from
+ * @param pos Position in the buffer to get the int from
+ * @return Int at the buffer position
+ */
+ public static int getInt(byte[] buf, int pos) {
+ return UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
index 20ed92b..c8a8cac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
@@ -15,52 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.giraph.utils;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.lang.reflect.Field;
-import org.apache.log4j.Logger;
+package org.apache.giraph.utils;
/**
- * Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * UnsafeByteArrayInputStream
*/
-public class UnsafeByteArrayInputStream implements ExtendedDataInput {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(
- UnsafeByteArrayInputStream.class);
- /** Access to the unsafe class */
- private static final sun.misc.Unsafe UNSAFE;
- static {
- try {
- Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- UNSAFE = (sun.misc.Unsafe) field.get(null);
- // Checkstyle exception due to needing to check if unsafe is allowed
- // CHECKSTYLE: stop IllegalCatch
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatch
- throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
- "get unsafe", e);
- }
- }
- /** Offset of a byte array */
- private static final long BYTE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(byte[].class);
- /** Offset of a long array */
- private static final long LONG_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(long[].class);
- /** Offset of a double array */
- private static final long DOUBLE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(double[].class);
-
- /** Byte buffer */
- private final byte[] buf;
- /** Buffer length */
- private final int bufLength;
- /** Position in the buffer */
- private int pos = 0;
+public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
/**
* Constructor
@@ -68,8 +29,7 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
* @param buf Buffer to read from
*/
public UnsafeByteArrayInputStream(byte[] buf) {
- this.buf = buf;
- this.bufLength = buf.length;
+ super(buf);
}
/**
@@ -80,275 +40,6 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
* @param length Max length of the buffer to read
*/
public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
- this.buf = buf;
- this.pos = offset;
- this.bufLength = length;
- }
-
- /**
- * How many bytes are still available?
- *
- * @return Number of bytes available
- */
- public int available() {
- return bufLength - pos;
- }
-
- /**
- * What position in the stream?
- *
- * @return Position
- */
- public int getPos() {
- return pos;
- }
-
- /**
- * Check whether there are enough remaining bytes for an operation
- *
- * @param requiredBytes Bytes required to read
- * @throws IOException When there are not enough bytes to read
- */
- private void ensureRemaining(int requiredBytes) throws IOException {
- if (bufLength - pos < requiredBytes) {
- throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
- " bytes remaining, trying to read " + requiredBytes);
- }
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- ensureRemaining(b.length);
- System.arraycopy(buf, pos, b, 0, b.length);
- pos += b.length;
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- ensureRemaining(len);
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- ensureRemaining(n);
- pos += n;
- return n;
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
- boolean value = UNSAFE.getBoolean(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
- return value;
- }
-
- @Override
- public byte readByte() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
- byte value = UNSAFE.getByte(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
- return value;
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return (short) (readByte() & 0xFF);
- }
-
- @Override
- public short readShort() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
- short value = UNSAFE.getShort(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
- return value;
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return readShort() & 0xFFFF;
- }
-
- @Override
- public char readChar() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
- char value = UNSAFE.getChar(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
- return value;
- }
-
- @Override
- public int readInt() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
- int value = UNSAFE.getInt(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
- return value;
- }
-
- @Override
- public long readLong() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
- long value = UNSAFE.getLong(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
- return value;
- }
-
- @Override
- public float readFloat() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
- float value = UNSAFE.getFloat(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
- return value;
- }
-
- @Override
- public double readDouble() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
- double value = UNSAFE.getDouble(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
- return value;
- }
-
- @Override
- public String readLine() throws IOException {
- // Note that this code is mostly copied from DataInputStream
- char[] tmpBuf = new char[128];
-
- int room = tmpBuf.length;
- int offset = 0;
- int c;
-
- loop:
- while (true) {
- c = readByte();
- switch (c) {
- case -1:
- case '\n':
- break loop;
- case '\r':
- int c2 = readByte();
- if ((c2 != '\n') && (c2 != -1)) {
- pos -= 1;
- }
- break loop;
- default:
- if (--room < 0) {
- char[] replacebuf = new char[offset + 128];
- room = replacebuf.length - offset - 1;
- System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
- tmpBuf = replacebuf;
- }
- tmpBuf[offset++] = (char) c;
- break;
- }
- }
- if ((c == -1) && (offset == 0)) {
- return null;
- }
- return String.copyValueOf(tmpBuf, 0, offset);
- }
-
- @Override
- public String readUTF() throws IOException {
- // Note that this code is mostly copied from DataInputStream
- int utflen = readUnsignedShort();
-
- byte[] bytearr = new byte[utflen];
- char[] chararr = new char[utflen];
-
- int c;
- int char2;
- int char3;
- int count = 0;
- int chararrCount = 0;
-
- readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararrCount++] = (char) c;
- }
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararrCount++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx*/
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException(
- "malformed input around byte " + count);
- }
- chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
- (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException(
- "malformed input around byte " + (count - 1));
- }
- chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
- ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException(
- "malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararrCount);
- }
-
- /**
- * Get an int at an arbitrary position in a byte[]
- *
- * @param buf Buffer to get the int from
- * @param pos Position in the buffer to get the int from
- * @return Int at the buffer position
- */
- public static int getInt(byte[] buf, int pos) {
- return UNSAFE.getInt(buf,
- BYTE_ARRAY_OFFSET + pos);
+ super(buf, offset, length);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index 4b413da..8736590 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -22,12 +22,21 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.Arrays;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
/**
* Byte array output stream that uses Unsafe methods to serialize/deserialize
* much faster
*/
public class UnsafeByteArrayOutputStream extends OutputStream
- implements ExtendedDataOutput {
+ implements ExtendedDataOutput {
static {
try {
Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
@@ -42,24 +51,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
}
}
- /** Bytes used in a boolean */
- public static final int SIZE_OF_BOOLEAN = 1;
- /** Bytes used in a byte */
- public static final int SIZE_OF_BYTE = 1;
- /** Bytes used in a char */
- public static final int SIZE_OF_CHAR = 2;
- /** Bytes used in a short */
- public static final int SIZE_OF_SHORT = 2;
- /** Bytes used in a medium */
- public static final int SIZE_OF_MEDIUM = 3;
- /** Bytes used in an int */
- public static final int SIZE_OF_INT = 4;
- /** Bytes used in a float */
- public static final int SIZE_OF_FLOAT = 4;
- /** Bytes used in a long */
- public static final int SIZE_OF_LONG = 8;
- /** Bytes used in a double */
- public static final int SIZE_OF_DOUBLE = 8;
/** Default number of bytes */
private static final int DEFAULT_BYTES = 32;
/** Access to the unsafe class */
@@ -68,12 +59,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
/** Offset of a byte array */
private static final long BYTE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);
- /** Offset of a long array */
- private static final long LONG_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(long[].class);
- /** Offset of a double array */
- private static final long DOUBLE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(double[].class);
/** Byte buffer */
private byte[] buf;
@@ -142,7 +127,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
@Override
public byte[] toByteArray() {
return Arrays.copyOf(buf, pos);
+ }
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > pos) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+ "Length: %d exceeds the size of buf : %d", offset, length, pos));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
}
@Override
@@ -212,10 +205,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
}
@Override
- public void skipBytes(int bytesToSkip) {
- if ((pos + bytesToSkip) > buf.length) {
- buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip));
+ public void ensureWritable(int minSize) {
+ if ((pos + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
}
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
pos += bytesToSkip;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
new file mode 100644
index 0000000..5f99846
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public abstract class UnsafeReads implements ExtendedDataInput {
+
+ /** Buffer length */
+ protected final int bufLength;
+ /** Position in the buffer */
+ protected long pos = 0;
+
+ /**
+ * Constructor
+ *
+ * @param length buf length
+ */
+ public UnsafeReads(int length) {
+ bufLength = length;
+ }
+
+ /**
+ * Constructor with offset
+ *
+ * @param offset offset in memory
+ * @param length buf length
+ */
+ public UnsafeReads(long offset, int length) {
+ pos = offset;
+ bufLength = length;
+ }
+
+ /**
+ * How many bytes are still available?
+ *
+ * @return Number of bytes available
+ */
+ public abstract int available();
+
+ /**
+ * What position in the stream?
+ *
+ * @return Position
+ */
+ public abstract int getPos();
+
+ /**
+ * Check whether there are enough remaining bytes for an operation
+ *
+ * @param requiredBytes Bytes required to read
+ * @throws IOException When there are not enough bytes to read
+ */
+ protected void ensureRemaining(int requiredBytes) throws IOException {
+ if (available() < requiredBytes) {
+ throw new IOException("ensureRemaining: Only " + available() +
+ " bytes remaining, trying to read " + requiredBytes);
+ }
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ ensureRemaining(n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ char[] tmpBuf = new char[128];
+
+ int room = tmpBuf.length;
+ int offset = 0;
+ int c;
+
+ loop:
+ while (true) {
+ c = readByte();
+ switch (c) {
+ case -1:
+ case '\n':
+ break loop;
+ case '\r':
+ int c2 = readByte();
+ if ((c2 != '\n') && (c2 != -1)) {
+ pos -= 1;
+ }
+ break loop;
+ default:
+ if (--room < 0) {
+ char[] replacebuf = new char[offset + 128];
+ room = replacebuf.length - offset - 1;
+ System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+ tmpBuf = replacebuf;
+ }
+ tmpBuf[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(tmpBuf, 0, offset);
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ int utflen = readUnsignedShort();
+
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx*/
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+ (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+ ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
deleted file mode 100644
index 8673732..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-/** Verbose Error mesage for ByteArray based messages */
-public class VerboseByteArrayMessageWrite {
- /** Do not construct */
- protected VerboseByteArrayMessageWrite() {
- }
-
- /**
- * verboseWriteCurrentMessage
- * de-serialize, then write messages
- *
- * @param iterator iterator
- * @param out DataOutput
- * @param <I> vertexId
- * @param <M> message
- * @throws IOException
- * @throws RuntimeException
- */
- public static <I extends WritableComparable, M extends Writable> void
- verboseWriteCurrentMessage(
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
- iterator, DataOutput out) throws IOException {
- try {
- iterator.getCurrentMessage().write(out);
- } catch (NegativeArraySizeException e) {
- throw new RuntimeException("The numbers of bytes sent to vertex " +
- iterator.getCurrentVertexId() + " exceeded the max capacity of " +
- "its ExtendedDataOutput. Please consider setting " +
- "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
- " in the graph which receive a lot of messages (total serialized " +
- "size of messages goes beyond the maximum size of a byte array), " +
- "setting this option to true will remove that limit");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
new file mode 100644
index 0000000..aa25490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Verbose Error mesage for ByteArray based messages */
+public class VerboseByteStructMessageWrite {
+ /**
+ * Private Constructor
+ */
+ private VerboseByteStructMessageWrite() {
+ }
+
+ /**
+ * verboseWriteCurrentMessage
+ * de-serialize, then write messages
+ *
+ * @param iterator iterator
+ * @param out DataOutput
+ * @param <I> vertexId
+ * @param <M> message
+ * @throws IOException
+ * @throws RuntimeException
+ */
+ public static <I extends WritableComparable, M extends Writable> void
+ verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
+ DataOutput out) throws IOException {
+ try {
+ iterator.getCurrentMessage().write(out);
+ } catch (NegativeArraySizeException e) {
+ handleNegativeArraySize(iterator.getCurrentVertexId());
+ }
+ }
+
+ /**
+ * message to present on NegativeArraySizeException
+ *
+ * @param vertexId vertexId
+ * @param <I> vertexId type
+ */
+ public static <I extends WritableComparable> void handleNegativeArraySize(
+ I vertexId) {
+ throw new RuntimeException("The numbers of bytes sent to vertex " +
+ vertexId + " exceeded the max capacity of " +
+ "its ExtendedDataOutput. Please consider setting " +
+ "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
+ " in the graph which receive a lot of messages (total serialized " +
+ "size of messages goes beyond the maximum size of a byte array), " +
+ "setting this option to true will remove that limit");
+ }
+}