You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 17:25:31 UTC
[2/3] GIRAPH-907: refactor giraph code to support multiple
implementations of vertexId data (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 5c56038..f26a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -18,9 +18,6 @@
package org.apache.giraph.utils;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
@@ -33,122 +30,8 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <T> Data
*/
-public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
- implements Writable, ImmutableClassesGiraphConfigurable {
- /** Extended data output */
- private ExtendedDataOutput extendedDataOutput;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
-
- /**
- * Create a new data object.
- *
- * @return Newly-created data object.
- */
- public abstract T createData();
-
- /**
- * Write a data object to an {@link ExtendedDataOutput}.
- *
- * @param out {@link ExtendedDataOutput}
- * @param data Data object to write
- * @throws IOException
- */
- public abstract void writeData(ExtendedDataOutput out, T data)
- throws IOException;
-
- /**
- * Read a data object's fields from an {@link ExtendedDataInput}.
- *
- * @param in {@link ExtendedDataInput}
- * @param data Data object to fill in-place
- * @throws IOException
- */
- public abstract void readData(ExtendedDataInput in, T data)
- throws IOException;
-
- /**
- * Initialize the inner state. Must be called before {@code add()} is
- * called.
- */
- public void initialize() {
- extendedDataOutput = configuration.createExtendedDataOutput();
- }
-
- /**
- * Initialize the inner state, with a known size. Must be called before
- * {@code add()} is called.
- *
- * @param expectedSize Number of bytes to be expected
- */
- public void initialize(int expectedSize) {
- extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
- }
-
- /**
- * Add a vertex id and data pair to the collection.
- *
- * @param vertexId Vertex id
- * @param data Data
- */
- public void add(I vertexId, T data) {
- try {
- vertexId.write(extendedDataOutput);
- writeData(extendedDataOutput, data);
- } catch (IOException e) {
- throw new IllegalStateException("add: IOException", e);
- }
- }
-
- /**
- * Add a serialized vertex id and data.
- *
- * @param serializedId The bye array which holds the serialized id.
- * @param idPos The end position of the serialized id in the byte array.
- * @param data Data
- */
- public void add(byte[] serializedId, int idPos, T data) {
- try {
- extendedDataOutput.write(serializedId, 0, idPos);
- writeData(extendedDataOutput, data);
- } catch (IOException e) {
- throw new IllegalStateException("add: IOException", e);
- }
- }
-
- /**
- * Get the number of bytes used.
- *
- * @return Bytes used
- */
- public int getSize() {
- return extendedDataOutput.getPos();
- }
-
- /**
- * Get the size of this object in serialized form.
- *
- * @return The size (in bytes) of the serialized object
- */
- public int getSerializedSize() {
- return 1 + 4 + getSize();
- }
-
- /**
- * Check if the list is empty.
- *
- * @return Whether the list is empty
- */
- public boolean isEmpty() {
- return extendedDataOutput.getPos() == 0;
- }
-
- /**
- * Clear the list.
- */
- public void clear() {
- extendedDataOutput.reset();
- }
+public abstract class ByteArrayVertexIdData<I extends WritableComparable,
+ T> extends AbstractVertexIdData<I, T> {
/**
* Get the underlying byte-array.
@@ -160,16 +43,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
}
@Override
- public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
- return configuration;
- }
-
- @Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeExtendedDataOutput(extendedDataOutput, dataOutput);
}
@@ -177,73 +50,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
@Override
public void readFields(DataInput dataInput) throws IOException {
extendedDataOutput =
- WritableUtils.readExtendedDataOutput(dataInput, configuration);
- }
-
- /**
- * Get an iterator over the pairs.
- *
- * @return Iterator
- */
- public VertexIdDataIterator getVertexIdDataIterator() {
- return new VertexIdDataIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and data objects so that the
- * lifetime of the object is only until next() is called.
- *
- * Vertex id ownership can be released if desired through
- * releaseCurrentVertexId(). This optimization allows us to cut down
- * on the number of objects instantiated and garbage collected.
- *
- * Not thread-safe.
- */
- public class VertexIdDataIterator extends VertexIdIterator<I> {
- /** Current data. */
- private T data;
-
- /** Default constructor. */
- public VertexIdDataIterator() {
- super(extendedDataOutput, configuration);
- }
-
- @Override
- public void next() {
- if (vertexId == null) {
- vertexId = configuration.createVertexId();
- }
- if (data == null) {
- data = createData();
- }
- try {
- vertexId.readFields(extendedDataInput);
- readData(extendedDataInput, data);
- } catch (IOException e) {
- throw new IllegalStateException("next: IOException", e);
- }
- }
-
- /**
- * Get the current data.
- *
- * @return Current data
- */
- public T getCurrentData() {
- return data;
- }
-
- /**
- * Release the current data object.
- *
- * @return Released data object
- */
- public T releaseCurrentData() {
- T releasedData = data;
- data = null;
- return releasedData;
- }
+ WritableUtils.readExtendedDataOutput(dataInput, getConf());
}
-
}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 762802b..16370c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -33,7 +33,8 @@ import java.io.IOException;
*/
@SuppressWarnings("unchecked")
public class ByteArrayVertexIdEdges<I extends WritableComparable,
- E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> {
+ E extends Writable> extends ByteArrayVertexIdData<I, Edge<I,
+ E>> implements VertexIdEdges<I, E> {
/**
* Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
* to generate edge objects.
@@ -62,37 +63,9 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
WritableUtils.readEdge(in, edge);
}
- /**
- * Get an iterator over the pairs.
- *
- * @return Iterator
- */
- public VertexIdEdgeIterator getVertexIdEdgeIterator() {
- return new VertexIdEdgeIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and edge objects so that the
- * lifetime of the object is only until next() is called.
- */
- public class VertexIdEdgeIterator extends VertexIdDataIterator {
- /**
- * Get the current edge.
- *
- * @return Current edge
- */
- public Edge<I, E> getCurrentEdge() {
- return getCurrentData();
- }
-
- /**
- * Release the current edge.
- *
- * @return Released edge
- */
- public Edge<I, E> releaseCurrentEdge() {
- return releaseCurrentData();
- }
+ @Override
+ public ByteStructVertexIdEdgeIterator<I, E> getVertexIdEdgeIterator() {
+ return new ByteStructVertexIdEdgeIterator<>(this);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0ac8fdf..b3eca3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.giraph.utils;
import org.apache.giraph.factories.MessageValueFactory;
@@ -33,7 +34,8 @@ import java.io.IOException;
*/
@SuppressWarnings("unchecked")
public class ByteArrayVertexIdMessages<I extends WritableComparable,
- M extends Writable> extends ByteArrayVertexIdData<I, M> {
+ M extends Writable> extends ByteArrayVertexIdData<I, M>
+ implements VertexIdMessages<I, M> {
/** Message value class */
private MessageValueFactory<M> messageValueFactory;
/** Add the message size to the stream? (Depends on the message store) */
@@ -52,7 +54,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
/**
* Set whether message sizes should be encoded. This should only be a
* possibility when not combining. When combining, all messages need to be
- * deserializd right away, so this won't help.
+ * de-serialized right away, so this won't help.
*/
private void setUseMessageSizeEncoding() {
if (!getConf().useMessageCombiner()) {
@@ -89,45 +91,31 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
setUseMessageSizeEncoding();
}
- /**
- * Get specialized iterator that will instiantiate the vertex id and
- * message of this object.
- *
- * @return Special iterator that reuses vertex ids and messages unless
- * specified
- */
- public VertexIdMessageIterator getVertexIdMessageIterator() {
- return new VertexIdMessageIterator();
- }
-
- /**
- * Special iterator that reuses vertex ids and message objects so that the
- * lifetime of the object is only until next() is called.
- */
- public class VertexIdMessageIterator extends VertexIdDataIterator {
- /**
- * Get the current message.
- *
- * @return Current message
- */
- public M getCurrentMessage() {
- return getCurrentData();
- }
+ @Override
+ public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+ return new ByteStructVertexIdMessageIterator<>(this);
}
- /**
- * Get specialized iterator that will instiantiate the vertex id and
- * message of this object. It will only produce message bytes, not actual
- * messages and expects a different encoding.
- *
- * @return Special iterator that reuses vertex ids (unless released) and
- * copies message bytes
- */
- public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+ @Override
+ public ByteStructVertexIdMessageBytesIterator<I, M>
+ getVertexIdMessageBytesIterator() {
if (!useMessageSizeEncoding) {
return null;
}
- return new VertexIdMessageBytesIterator();
+ return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
+ @Override
+ public void writeCurrentMessageBytes(DataOutput dataOutput) {
+ try {
+ dataOutput.write(extendedDataOutput.getByteArray(),
+ messageOffset, messageBytes);
+ } catch (NegativeArraySizeException e) {
+ VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
+ } catch (IOException e) {
+ throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+ "IOException", e);
+ }
+ }
+ };
}
@Override
@@ -141,66 +129,4 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
useMessageSizeEncoding = dataInput.readBoolean();
super.readFields(dataInput);
}
-
- /**
- * Special iterator that reuses vertex ids and messages bytes so that the
- * lifetime of the object is only until next() is called.
- *
- * Vertex id ownership can be released if desired through
- * releaseCurrentVertexId(). This optimization allows us to cut down
- * on the number of objects instantiated and garbage collected. Messages
- * can only be copied to an ExtendedDataOutput object
- *
- * Not thread-safe.
- */
- public class VertexIdMessageBytesIterator extends VertexIdDataIterator {
- /** Last message offset */
- private int messageOffset = -1;
- /** Number of bytes in the last message */
- private int messageBytes = -1;
-
- /**
- * Moves to the next element in the iteration.
- */
- @Override
- public void next() {
- if (vertexId == null) {
- vertexId = getConf().createVertexId();
- }
-
- try {
- vertexId.readFields(extendedDataInput);
- messageBytes = extendedDataInput.readInt();
- messageOffset = extendedDataInput.getPos();
- if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
- throw new IllegalStateException("next: Failed to skip " +
- messageBytes);
- }
- } catch (IOException e) {
- throw new IllegalStateException("next: IOException", e);
- }
- }
-
- /**
- * Write the current message to an ExtendedDataOutput object
- *
- * @param dataOutput Where the current message will be written to
- */
- public void writeCurrentMessageBytes(DataOutput dataOutput) {
- try {
- dataOutput.write(getByteArray(), messageOffset, messageBytes);
- } catch (NegativeArraySizeException e) {
- throw new RuntimeException("The numbers of bytes sent to vertex " +
- vertexId + " exceeded the max capacity of " +
- "its ExtendedDataOutput. Please consider setting " +
- "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
- " in the graph which receive a lot of messages (total serialized " +
- "size of messages goes beyond the maximum size of a byte array), " +
- "setting this option to true will remove that limit");
- } catch (IOException e) {
- throw new IllegalStateException("writeCurrentMessageBytes: Got " +
- "IOException", e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
new file mode 100644
index 0000000..4e15c1b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterable is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired. It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterable<T extends Writable> implements
+ Iterable<T> {
+ /** Factory for data input */
+ protected final Factory<? extends ExtendedDataInput> dataInputFactory;
+
+ /**
+ * Constructor
+ *
+ * @param dataInputFactory Factory for data inputs
+ */
+ public ByteStructIterable(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ this.dataInputFactory = dataInputFactory;
+ }
+
+ /**
+ * Must be able to create the writable object
+ *
+ * @return New writable
+ */
+ protected abstract T createWritable();
+
+ @Override
+ public Iterator<T> iterator() {
+ return new ByteStructIterator<T>(dataInputFactory.create()) {
+ @Override
+ protected T createWritable() {
+ return ByteStructIterable.this.createWritable();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
new file mode 100644
index 0000000..322365c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterator is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired. It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterator<T extends Writable> implements
+ Iterator<T> {
+ /** Data input */
+ protected final ExtendedDataInput extendedDataInput;
+
+ /**
+ * Wrap ExtendedDataInput in ByteArrayIterator
+ *
+ * @param extendedDataInput ExtendedDataInput
+ */
+ public ByteStructIterator(ExtendedDataInput extendedDataInput) {
+ this.extendedDataInput = extendedDataInput;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public T next() {
+ T writable = createWritable();
+ try {
+ writable.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: readFields got IOException", e);
+ }
+ return writable;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove: Not supported");
+ }
+
+ /**
+ * Must be able to create the writable object
+ *
+ * @return New writable
+ */
+ protected abstract T createWritable();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
new file mode 100644
index 0000000..cefec0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@NotThreadSafe
+public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
+ extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
+ /** VertexIdData to iterate over */
+ protected AbstractVertexIdData<I, T> vertexIdData;
+ /** Current data. */
+ private T data;
+
+ /**
+ * Constructor
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdDataIterator(
+ AbstractVertexIdData<I, T> vertexIdData) {
+ super(vertexIdData.extendedDataOutput, vertexIdData.getConf());
+ this.vertexIdData = vertexIdData;
+ }
+
+ @Override
+ public void next() {
+ if (vertexId == null) {
+ vertexId = vertexIdData.getConf().createVertexId();
+ }
+ if (data == null) {
+ data = vertexIdData.createData();
+ }
+ try {
+ vertexId.readFields(extendedDataInput);
+ vertexIdData.readData(extendedDataInput, data);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+
+ @Override
+ public T getCurrentData() {
+ return data;
+ }
+
+ @Override
+ public T releaseCurrentData() {
+ T releasedData = data;
+ data = null;
+ return releasedData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
new file mode 100644
index 0000000..7e06038
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public class ByteStructVertexIdEdgeIterator<I extends WritableComparable,
+ E extends Writable> extends ByteStructVertexIdDataIterator<I, Edge<I, E>>
+ implements VertexIdEdgeIterator<I, E> {
+
+ /**
+ * Constructor
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdEdgeIterator(
+ AbstractVertexIdData<I, Edge<I, E>> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() {
+ return getCurrentData();
+ }
+
+
+ @Override
+ public Edge<I, E> releaseCurrentEdge() {
+ return releaseCurrentData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
new file mode 100644
index 0000000..3d564cd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator
+ * and VertexIdMessageBytesIterator.
+ *
+ * @param <I> Vertex id
+ */
+public abstract class ByteStructVertexIdIterator<I extends WritableComparable>
+ implements VertexIdIterator<I> {
+ /** Reader of the serialized edges */
+ protected final ExtendedDataInput extendedDataInput;
+
+ /** Current vertex id */
+ protected I vertexId;
+
+ /**
+ * Constructor.
+ *
+ * @param extendedDataOutput Extended data output
+ * @param conf Configuration
+ */
+ public ByteStructVertexIdIterator(
+ ExtendedDataOutput extendedDataOutput,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+ if (extendedDataOutput != null && conf != null) {
+ extendedDataInput = conf.createExtendedDataInput(extendedDataOutput);
+ } else {
+ throw new IllegalStateException("Cannot instantiate vertexIdIterator " +
+ "with null arguments");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ @Override
+ public I getCurrentVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public I releaseCurrentVertexId() {
+ I releasedVertexId = vertexId;
+ vertexId = null;
+ return releasedVertexId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..c0a86c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId(). This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected. Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+@NotThreadSafe
+public abstract class ByteStructVertexIdMessageBytesIterator<I
+ extends WritableComparable, M extends Writable>
+ extends ByteStructVertexIdDataIterator<I, M>
+ implements VertexIdMessageBytesIterator<I, M> {
+ /** Last message offset */
+ protected int messageOffset = -1;
+ /** Number of bytes in the last message */
+ protected int messageBytes = -1;
+
+ /**
+ * Constructor with vertexIdData
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdMessageBytesIterator(
+ AbstractVertexIdData<I, M> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ /**
+ * Moves to the next element in the iteration.
+ */
+ @Override
+ public void next() {
+ if (vertexId == null) {
+ vertexId = vertexIdData.getConf().createVertexId();
+ }
+
+ try {
+ vertexId.readFields(extendedDataInput);
+ messageBytes = extendedDataInput.readInt();
+ messageOffset = extendedDataInput.getPos();
+ if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+ throw new IllegalStateException("next: Failed to skip " +
+ messageBytes);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
new file mode 100644
index 0000000..b686211
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
+ M extends Writable> extends ByteStructVertexIdDataIterator<I, M>
+ implements VertexIdMessageIterator<I, M> {
+
+ /**
+ * Constructor with vertexIdData
+ *
+ * @param vertexIdData vertexIdData
+ */
+ public ByteStructVertexIdMessageIterator(
+ AbstractVertexIdData<I, M> vertexIdData) {
+ super(vertexIdData);
+ }
+
+ @Override
+ public M getCurrentMessage() {
+ return getCurrentData();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
new file mode 100644
index 0000000..52d959e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+ /** Bytes used in a boolean */
+ public static final int SIZE_OF_BOOLEAN = 1;
+ /** Bytes used in a byte */
+ public static final int SIZE_OF_BYTE = 1;
+ /** Bytes used in a char */
+ public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+ /** Bytes used in a short */
+ public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+ /** Bytes used in an int */
+ public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+ /** Bytes used in a long */
+ public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+ /** Bytes used in a float */
+ public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+ /** Bytes used in a double */
+ public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+
+ /**
+ * Private Constructor
+ */
+ private ByteUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
index 0ecea77..3eae25b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -26,7 +26,7 @@ import java.io.IOException;
* Provides access to a internals of ByteArrayInputStream
*/
public class ExtendedByteArrayDataInput extends ByteArrayInputStream
- implements ExtendedDataInput {
+ implements ExtendedDataInput {
/** Internal data input */
private final DataInput dataInput;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 0ff366d..9988b15 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -140,10 +140,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
}
@Override
- public void skipBytes(int bytesToSkip) {
- if ((count + bytesToSkip) > buf.length) {
- buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+ public void ensureWritable(int minSize) {
+ if ((count + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + minSize));
}
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
count += bytesToSkip;
}
@@ -161,6 +166,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
}
@Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > count) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+ "Length: %d exceeds the size of buf : %d", offset, length, count));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
+ }
+
+ @Override
public byte[] getByteArray() {
return buf;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index 54ef514..bc979af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -24,6 +24,14 @@ import java.io.DataOutput;
*/
public interface ExtendedDataOutput extends DataOutput {
/**
+ * Ensure that backing byte structure has at least minSize
+ * additional bytes
+ *
+ * @param minSize additional size required
+ */
+ void ensureWritable(int minSize);
+
+ /**
* Skip some number of bytes.
*
* @param bytesToSkip Number of bytes to skip
@@ -61,7 +69,17 @@ public interface ExtendedDataOutput extends DataOutput {
byte[] toByteArray();
/**
+ * Return a copy of slice of byte array
+ *
+ * @param offset offset of array
+ * @param length length of slice
+ * @return byte array
+ */
+ byte[] toByteArray(int offset, int length);
+
+ /**
* Clears the buffer
*/
void reset();
}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
deleted file mode 100644
index 2c24e89..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by the iterators generated from this object have
- * lifetimes only until next() is called. In that sense, the object
- * provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterable<T extends Writable>
- extends ByteArrayIterable<T> {
- /**
- * Constructor
- *
- * @param dataInputFactory Factory for data inputs
- */
- public RepresentativeByteArrayIterable(
- Factory<? extends ExtendedDataInput> dataInputFactory) {
- super(dataInputFactory);
- }
-
- /**
- * Iterator over the internal byte array
- */
- private class RepresentativeByteArrayIterableIterator extends
- RepresentativeByteArrayIterator<T> {
- /**
- * Constructor.
- *
- * @param extendedDataInput ExtendedDataInput
- */
- private RepresentativeByteArrayIterableIterator(
- ExtendedDataInput extendedDataInput) {
- super(extendedDataInput);
- }
-
- @Override
- protected T createWritable() {
- return RepresentativeByteArrayIterable.this.createWritable();
- }
- }
-
- @Override
- public Iterator<T> iterator() {
- return
- new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
deleted file mode 100644
index d36c94f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by this iterator have lifetimes only until next() is
- * called. In that sense, the object provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterator<T extends
- Writable> extends ByteArrayIterator<T> {
- /** Representative writable */
- private final T representativeWritable = createWritable();
-
- /**
- * Wrap ExtendedDataInput in ByteArrayIterator
- *
- * @param extendedDataInput ExtendedDataInput
- */
- public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
- super(extendedDataInput);
- }
-
- @Override
- public T next() {
- try {
- representativeWritable.readFields(extendedDataInput);
- } catch (IOException e) {
- throw new IllegalStateException("next: readFields got IOException", e);
- }
- return representativeWritable;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
new file mode 100644
index 0000000..0859010
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by the iterators generated from this object have
+ * lifetimes only until next() is called. In that sense, the object
+ * provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterable<T extends Writable>
+ extends ByteStructIterable<T> {
+ /**
+ * Constructor
+ *
+ * @param dataInputFactory Factory for data inputs
+ */
+ public RepresentativeByteStructIterable(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ super(dataInputFactory);
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return new RepresentativeByteStructIterator<T>(dataInputFactory.create()) {
+ @Override
+ protected T createWritable() {
+ return RepresentativeByteStructIterable.this.createWritable();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
new file mode 100644
index 0000000..0bf98ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by this iterator have lifetimes only until next() is
+ * called. In that sense, the object provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterator<T extends
+ Writable> extends ByteStructIterator<T> {
+ /** Representative writable */
+ private final T representativeWritable = createWritable();
+
+ /**
+ * Wrap ExtendedDataInput in ByteArrayIterator
+ *
+ * @param extendedDataInput ExtendedDataInput
+ */
+ public RepresentativeByteStructIterator(ExtendedDataInput extendedDataInput) {
+ super(extendedDataInput);
+ }
+
+ @Override
+ public T next() {
+ try {
+ representativeWritable.readFields(extendedDataInput);
+ } catch (IOException e) {
+ throw new IllegalStateException("next: readFields got IOException", e);
+ }
+ return representativeWritable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
new file mode 100644
index 0000000..2c56cb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+import java.io.IOException;
+import org.apache.log4j.Logger;
+
+/**
+ * RequestUtils utility class
+ */
+public class RequestUtils {
+ /** Logger */
+ public static final Logger LOG = Logger.getLogger(RequestUtils.class);
+
+ /**
+ * Private Constructor
+ */
+ private RequestUtils() {
+ }
+
+ /**
+ * decodeWritableRequest based on predicate
+ *
+ * @param buf ByteBuf
+ * @param request writableRequest
+ * @return properly initialized writableRequest
+ * @throws IOException
+ */
+ public static WritableRequest decodeWritableRequest(ByteBuf buf,
+ WritableRequest request) throws IOException {
+ ByteBufInputStream input = new ByteBufInputStream(buf);
+ request.readFields(input);
+ return request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
new file mode 100644
index 0000000..db19fda
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public class UnsafeArrayReads extends UnsafeReads {
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeArrayReads: Failed to " +
+ "get unsafe", e);
+ }
+ }
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Byte buffer */
+ private final byte[] buf;
+
+ /**
+ * Constructor
+ *
+ * @param buf Buffer to read from
+ */
+ public UnsafeArrayReads(byte[] buf) {
+ super(buf.length);
+ this.buf = buf;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer to read from
+ * @param offset Offsetin the buffer to start reading from
+ * @param length Max length of the buffer to read
+ */
+ public UnsafeArrayReads(byte[] buf, int offset, int length) {
+ super(offset, length);
+ this.buf = buf;
+ }
+
+ @Override
+ public int available() {
+ return (int) (bufLength - pos);
+ }
+
+
+ @Override
+ public int getPos() {
+ return (int) pos;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ ensureRemaining(b.length);
+ System.arraycopy(buf, (int) pos, b, 0, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ ensureRemaining(len);
+ System.arraycopy(buf, (int) pos, b, off, len);
+ pos += len;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ ensureRemaining(SIZE_OF_BOOLEAN);
+ boolean value = UNSAFE.getBoolean(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_BOOLEAN;
+ return value;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ ensureRemaining(SIZE_OF_BYTE);
+ byte value = UNSAFE.getByte(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_BYTE;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return (short) (readByte() & 0xFF);
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ ensureRemaining(SIZE_OF_SHORT);
+ short value = UNSAFE.getShort(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_SHORT;
+ return value;
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ ensureRemaining(SIZE_OF_CHAR);
+ char value = UNSAFE.getChar(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_CHAR;
+ return value;
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ ensureRemaining(SIZE_OF_INT);
+ int value = UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_INT;
+ return value;
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ ensureRemaining(SIZE_OF_LONG);
+ long value = UNSAFE.getLong(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_LONG;
+ return value;
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ ensureRemaining(SIZE_OF_FLOAT);
+ float value = UNSAFE.getFloat(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_FLOAT;
+ return value;
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ ensureRemaining(SIZE_OF_DOUBLE);
+ double value = UNSAFE.getDouble(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ pos += SIZE_OF_DOUBLE;
+ return value;
+ }
+
+ /**
+ * Get an int at an arbitrary position in a byte[]
+ *
+ * @param buf Buffer to get the int from
+ * @param pos Position in the buffer to get the int from
+ * @return Int at the buffer position
+ */
+ public static int getInt(byte[] buf, int pos) {
+ return UNSAFE.getInt(buf,
+ BYTE_ARRAY_OFFSET + pos);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
index 20ed92b..c8a8cac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
@@ -15,52 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.giraph.utils;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.lang.reflect.Field;
-import org.apache.log4j.Logger;
+package org.apache.giraph.utils;
/**
- * Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * UnsafeByteArrayInputStream
*/
-public class UnsafeByteArrayInputStream implements ExtendedDataInput {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(
- UnsafeByteArrayInputStream.class);
- /** Access to the unsafe class */
- private static final sun.misc.Unsafe UNSAFE;
- static {
- try {
- Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- UNSAFE = (sun.misc.Unsafe) field.get(null);
- // Checkstyle exception due to needing to check if unsafe is allowed
- // CHECKSTYLE: stop IllegalCatch
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatch
- throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
- "get unsafe", e);
- }
- }
- /** Offset of a byte array */
- private static final long BYTE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(byte[].class);
- /** Offset of a long array */
- private static final long LONG_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(long[].class);
- /** Offset of a double array */
- private static final long DOUBLE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(double[].class);
-
- /** Byte buffer */
- private final byte[] buf;
- /** Buffer length */
- private final int bufLength;
- /** Position in the buffer */
- private int pos = 0;
+public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
/**
* Constructor
@@ -68,8 +29,7 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
* @param buf Buffer to read from
*/
public UnsafeByteArrayInputStream(byte[] buf) {
- this.buf = buf;
- this.bufLength = buf.length;
+ super(buf);
}
/**
@@ -80,275 +40,6 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
* @param length Max length of the buffer to read
*/
public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
- this.buf = buf;
- this.pos = offset;
- this.bufLength = length;
- }
-
- /**
- * How many bytes are still available?
- *
- * @return Number of bytes available
- */
- public int available() {
- return bufLength - pos;
- }
-
- /**
- * What position in the stream?
- *
- * @return Position
- */
- public int getPos() {
- return pos;
- }
-
- /**
- * Check whether there are enough remaining bytes for an operation
- *
- * @param requiredBytes Bytes required to read
- * @throws IOException When there are not enough bytes to read
- */
- private void ensureRemaining(int requiredBytes) throws IOException {
- if (bufLength - pos < requiredBytes) {
- throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
- " bytes remaining, trying to read " + requiredBytes);
- }
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- ensureRemaining(b.length);
- System.arraycopy(buf, pos, b, 0, b.length);
- pos += b.length;
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- ensureRemaining(len);
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- ensureRemaining(n);
- pos += n;
- return n;
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
- boolean value = UNSAFE.getBoolean(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
- return value;
- }
-
- @Override
- public byte readByte() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
- byte value = UNSAFE.getByte(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
- return value;
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return (short) (readByte() & 0xFF);
- }
-
- @Override
- public short readShort() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
- short value = UNSAFE.getShort(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
- return value;
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return readShort() & 0xFFFF;
- }
-
- @Override
- public char readChar() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
- char value = UNSAFE.getChar(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
- return value;
- }
-
- @Override
- public int readInt() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
- int value = UNSAFE.getInt(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
- return value;
- }
-
- @Override
- public long readLong() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
- long value = UNSAFE.getLong(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
- return value;
- }
-
- @Override
- public float readFloat() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
- float value = UNSAFE.getFloat(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
- return value;
- }
-
- @Override
- public double readDouble() throws IOException {
- ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
- double value = UNSAFE.getDouble(buf,
- BYTE_ARRAY_OFFSET + pos);
- pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
- return value;
- }
-
- @Override
- public String readLine() throws IOException {
- // Note that this code is mostly copied from DataInputStream
- char[] tmpBuf = new char[128];
-
- int room = tmpBuf.length;
- int offset = 0;
- int c;
-
- loop:
- while (true) {
- c = readByte();
- switch (c) {
- case -1:
- case '\n':
- break loop;
- case '\r':
- int c2 = readByte();
- if ((c2 != '\n') && (c2 != -1)) {
- pos -= 1;
- }
- break loop;
- default:
- if (--room < 0) {
- char[] replacebuf = new char[offset + 128];
- room = replacebuf.length - offset - 1;
- System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
- tmpBuf = replacebuf;
- }
- tmpBuf[offset++] = (char) c;
- break;
- }
- }
- if ((c == -1) && (offset == 0)) {
- return null;
- }
- return String.copyValueOf(tmpBuf, 0, offset);
- }
-
- @Override
- public String readUTF() throws IOException {
- // Note that this code is mostly copied from DataInputStream
- int utflen = readUnsignedShort();
-
- byte[] bytearr = new byte[utflen];
- char[] chararr = new char[utflen];
-
- int c;
- int char2;
- int char3;
- int count = 0;
- int chararrCount = 0;
-
- readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararrCount++] = (char) c;
- }
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararrCount++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx*/
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException(
- "malformed input around byte " + count);
- }
- chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
- (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException(
- "malformed input around byte " + (count - 1));
- }
- chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
- ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException(
- "malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararrCount);
- }
-
- /**
- * Get an int at an arbitrary position in a byte[]
- *
- * @param buf Buffer to get the int from
- * @param pos Position in the buffer to get the int from
- * @return Int at the buffer position
- */
- public static int getInt(byte[] buf, int pos) {
- return UNSAFE.getInt(buf,
- BYTE_ARRAY_OFFSET + pos);
+ super(buf, offset, length);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index 4b413da..8736590 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -22,12 +22,21 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.Arrays;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
/**
* Byte array output stream that uses Unsafe methods to serialize/deserialize
* much faster
*/
public class UnsafeByteArrayOutputStream extends OutputStream
- implements ExtendedDataOutput {
+ implements ExtendedDataOutput {
static {
try {
Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
@@ -42,24 +51,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
}
}
- /** Bytes used in a boolean */
- public static final int SIZE_OF_BOOLEAN = 1;
- /** Bytes used in a byte */
- public static final int SIZE_OF_BYTE = 1;
- /** Bytes used in a char */
- public static final int SIZE_OF_CHAR = 2;
- /** Bytes used in a short */
- public static final int SIZE_OF_SHORT = 2;
- /** Bytes used in a medium */
- public static final int SIZE_OF_MEDIUM = 3;
- /** Bytes used in an int */
- public static final int SIZE_OF_INT = 4;
- /** Bytes used in a float */
- public static final int SIZE_OF_FLOAT = 4;
- /** Bytes used in a long */
- public static final int SIZE_OF_LONG = 8;
- /** Bytes used in a double */
- public static final int SIZE_OF_DOUBLE = 8;
/** Default number of bytes */
private static final int DEFAULT_BYTES = 32;
/** Access to the unsafe class */
@@ -68,12 +59,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
/** Offset of a byte array */
private static final long BYTE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);
- /** Offset of a long array */
- private static final long LONG_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(long[].class);
- /** Offset of a double array */
- private static final long DOUBLE_ARRAY_OFFSET =
- UNSAFE.arrayBaseOffset(double[].class);
/** Byte buffer */
private byte[] buf;
@@ -142,7 +127,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
@Override
public byte[] toByteArray() {
return Arrays.copyOf(buf, pos);
+ }
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > pos) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+ "Length: %d exceeds the size of buf : %d", offset, length, pos));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
}
@Override
@@ -212,10 +205,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
}
@Override
- public void skipBytes(int bytesToSkip) {
- if ((pos + bytesToSkip) > buf.length) {
- buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip));
+ public void ensureWritable(int minSize) {
+ if ((pos + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
}
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
pos += bytesToSkip;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
new file mode 100644
index 0000000..5f99846
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public abstract class UnsafeReads implements ExtendedDataInput {
+
+ /** Buffer length */
+ protected final int bufLength;
+ /** Position in the buffer */
+ protected long pos = 0;
+
+ /**
+ * Constructor
+ *
+ * @param length buf length
+ */
+ public UnsafeReads(int length) {
+ bufLength = length;
+ }
+
+ /**
+ * Constructor with offset
+ *
+ * @param offset offset in memory
+ * @param length buf length
+ */
+ public UnsafeReads(long offset, int length) {
+ pos = offset;
+ bufLength = length;
+ }
+
+ /**
+ * How many bytes are still available?
+ *
+ * @return Number of bytes available
+ */
+ public abstract int available();
+
+ /**
+ * What position in the stream?
+ *
+ * @return Position
+ */
+ public abstract int getPos();
+
+ /**
+ * Check whether there are enough remaining bytes for an operation
+ *
+ * @param requiredBytes Bytes required to read
+ * @throws IOException When there are not enough bytes to read
+ */
+ protected void ensureRemaining(int requiredBytes) throws IOException {
+ if (available() < requiredBytes) {
+ throw new IOException("ensureRemaining: Only " + available() +
+ " bytes remaining, trying to read " + requiredBytes);
+ }
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ ensureRemaining(n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ char[] tmpBuf = new char[128];
+
+ int room = tmpBuf.length;
+ int offset = 0;
+ int c;
+
+ loop:
+ while (true) {
+ c = readByte();
+ switch (c) {
+ case -1:
+ case '\n':
+ break loop;
+ case '\r':
+ int c2 = readByte();
+ if ((c2 != '\n') && (c2 != -1)) {
+ pos -= 1;
+ }
+ break loop;
+ default:
+ if (--room < 0) {
+ char[] replacebuf = new char[offset + 128];
+ room = replacebuf.length - offset - 1;
+ System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+ tmpBuf = replacebuf;
+ }
+ tmpBuf[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(tmpBuf, 0, offset);
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ // Note that this code is mostly copied from DataInputStream
+ int utflen = readUnsignedShort();
+
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c;
+ int char2;
+ int char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx*/
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+ (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+ ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
deleted file mode 100644
index 8673732..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-/** Verbose Error mesage for ByteArray based messages */
-public class VerboseByteArrayMessageWrite {
- /** Do not construct */
- protected VerboseByteArrayMessageWrite() {
- }
-
- /**
- * verboseWriteCurrentMessage
- * de-serialize, then write messages
- *
- * @param iterator iterator
- * @param out DataOutput
- * @param <I> vertexId
- * @param <M> message
- * @throws IOException
- * @throws RuntimeException
- */
- public static <I extends WritableComparable, M extends Writable> void
- verboseWriteCurrentMessage(
- ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
- iterator, DataOutput out) throws IOException {
- try {
- iterator.getCurrentMessage().write(out);
- } catch (NegativeArraySizeException e) {
- throw new RuntimeException("The numbers of bytes sent to vertex " +
- iterator.getCurrentVertexId() + " exceeded the max capacity of " +
- "its ExtendedDataOutput. Please consider setting " +
- "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
- " in the graph which receive a lot of messages (total serialized " +
- "size of messages goes beyond the maximum size of a byte array), " +
- "setting this option to true will remove that limit");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
new file mode 100644
index 0000000..aa25490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Verbose Error mesage for ByteArray based messages */
+public class VerboseByteStructMessageWrite {
+ /**
+ * Private Constructor
+ */
+ private VerboseByteStructMessageWrite() {
+ }
+
+ /**
+ * verboseWriteCurrentMessage
+ * de-serialize, then write messages
+ *
+ * @param iterator iterator
+ * @param out DataOutput
+ * @param <I> vertexId
+ * @param <M> message
+ * @throws IOException
+ * @throws RuntimeException
+ */
+ public static <I extends WritableComparable, M extends Writable> void
+ verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
+ DataOutput out) throws IOException {
+ try {
+ iterator.getCurrentMessage().write(out);
+ } catch (NegativeArraySizeException e) {
+ handleNegativeArraySize(iterator.getCurrentVertexId());
+ }
+ }
+
+ /**
+ * message to present on NegativeArraySizeException
+ *
+ * @param vertexId vertexId
+ * @param <I> vertexId type
+ */
+ public static <I extends WritableComparable> void handleNegativeArraySize(
+ I vertexId) {
+ throw new RuntimeException("The numbers of bytes sent to vertex " +
+ vertexId + " exceeded the max capacity of " +
+ "its ExtendedDataOutput. Please consider setting " +
+ "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
+ " in the graph which receive a lot of messages (total serialized " +
+ "size of messages goes beyond the maximum size of a byte array), " +
+ "setting this option to true will remove that limit");
+ }
+}