You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/09 20:42:24 UTC
git commit: GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
Updated Branches:
refs/heads/trunk 1684891ec -> 7a04bfd29
GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7a04bfd2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7a04bfd2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7a04bfd2
Branch: refs/heads/trunk
Commit: 7a04bfd29dff23604f465299e0ef0f262b0da931
Parents: 1684891
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jan 9 14:39:08 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Jan 9 14:42:23 2013 -0500
----------------------------------------------------------------------
CHANGELOG | 2 +
.../io/formats/SequenceFileVertexOutputFormat.java | 124 +++++++++++++++
2 files changed, 126 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bfb9a55..d67df78 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
+
GIRAPH-409: Refactor / cleanups (nitay)
GIRAPH-465: MapFunctions cleanup (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
new file mode 100644
index 0000000..0538db9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Sequence file vertex output format. It allows to convert a vertex into a key
+ * and value pair of desired types, and output the pair into a sequence file.
+ * A subclass has to provide two conversion methods convertToSequenceFileKey()
+ * and convertToSequenceFileValue().
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <OK> Output key data type for a sequence file
+ * @param <OV> Output value data type for a sequence file
+ */
+public abstract class SequenceFileVertexOutputFormat<
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ OK extends Writable,
+ OV extends Writable>
+ extends VertexOutputFormat<I, V, E> {
+ /**
+ * Output format of a sequence file that stores key-value pairs of the
+ * desired types.
+ */
+ private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat =
+ new SequenceFileOutputFormat<OK, OV>();
+
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ sequenceFileOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return sequenceFileOutputFormat.getOutputCommitter(context);
+ }
+
+ @Override
+ public VertexWriter createVertexWriter(TaskAttemptContext
+ context) throws IOException, InterruptedException {
+ return new SequenceFileVertexWriter();
+ }
+
+ /**
+ * Converts a vertex identifier into a sequence file key.
+ * @param vertexId Vertex identifier.
+ * @return Sequence file key.
+ */
+ protected abstract OK convertToSequenceFileKey(I vertexId);
+
+ /**
+ * Converts a vertex value into a sequence file value.
+ * @param vertexValue Vertex value.
+ * @return Sequence file value.
+ */
+ protected abstract OV convertToSequenceFileValue(V vertexValue);
+
+ /**
+ * Vertex writer that converts a vertex into a key-value pair and writes
+ * the result into a sequence file for a context.
+ */
+ private class SequenceFileVertexWriter implements VertexWriter<I, V, E> {
+ /**
+ * A record writer that will write into a sequence file initialized for
+ * a context.
+ */
+ private RecordWriter<OK, OV> recordWriter;
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ recordWriter = sequenceFileOutputFormat.getRecordWriter(context);
+ }
+
+ @Override
+ public final void writeVertex(Vertex<I, V, E, ?> vertex) throws
+ IOException, InterruptedException {
+ // Convert vertex id to type OK.
+ OK outKey = convertToSequenceFileKey(vertex.getId());
+ // Convert vertex value to type OV.
+ OV outValue = convertToSequenceFileValue(vertex.getValue());
+ recordWriter.write(outKey, outValue);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ recordWriter.close(context);
+ }
+ }
+}