You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/09 20:42:24 UTC

git commit: GIRAPH-476: SequenceFileVertexOutputFormat (nitay)

Updated Branches:
  refs/heads/trunk 1684891ec -> 7a04bfd29


GIRAPH-476: SequenceFileVertexOutputFormat (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7a04bfd2
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7a04bfd2
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7a04bfd2

Branch: refs/heads/trunk
Commit: 7a04bfd29dff23604f465299e0ef0f262b0da931
Parents: 1684891
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jan 9 14:39:08 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Wed Jan 9 14:42:23 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../io/formats/SequenceFileVertexOutputFormat.java |  124 +++++++++++++++
 2 files changed, 126 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bfb9a55..d67df78 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
+
   GIRAPH-409: Refactor / cleanups (nitay)
 
   GIRAPH-465: MapFunctions cleanup (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/7a04bfd2/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
new file mode 100644
index 0000000..0538db9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Sequence file vertex output format. It allows to convert a vertex into a key
+ * and value pair of desired types, and output the pair into a sequence file.
+ * A subclass has to provide two conversion methods convertToSequenceFileKey()
+ * and convertToSequenceFileValue().
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <OK> Output key data type for a sequence file
+ * @param <OV> Output value data type for a sequence file
+ */
+public abstract class SequenceFileVertexOutputFormat<
+  I extends WritableComparable,
+  V extends Writable,
+  E extends Writable,
+  OK extends Writable,
+  OV extends Writable>
+  extends VertexOutputFormat<I, V, E> {
+  /**
+   * Output format of a sequence file that stores key-value pairs of the
+   * desired types.
+   */
+  private SequenceFileOutputFormat<OK, OV> sequenceFileOutputFormat =
+      new SequenceFileOutputFormat<OK, OV>();
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    sequenceFileOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return sequenceFileOutputFormat.getOutputCommitter(context);
+  }
+
+  @Override
+  public VertexWriter createVertexWriter(TaskAttemptContext
+      context) throws IOException, InterruptedException {
+    return new SequenceFileVertexWriter();
+  }
+
+  /**
+   * Converts a vertex identifier into a sequence file key.
+   * @param vertexId Vertex identifier.
+   * @return Sequence file key.
+   */
+  protected abstract OK convertToSequenceFileKey(I vertexId);
+
+  /**
+   * Converts a vertex value into a sequence file value.
+   * @param vertexValue Vertex value.
+   * @return Sequence file value.
+   */
+  protected abstract OV convertToSequenceFileValue(V vertexValue);
+
+  /**
+   * Vertex writer that converts a vertex into a key-value pair and writes
+   * the result into a sequence file for a context.
+   */
+  private class SequenceFileVertexWriter implements VertexWriter<I, V, E> {
+    /**
+     * A record writer that will write into a sequence file initialized for
+     * a context.
+     */
+    private RecordWriter<OK, OV> recordWriter;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+           InterruptedException {
+      recordWriter = sequenceFileOutputFormat.getRecordWriter(context);
+    }
+
+    @Override
+    public final void writeVertex(Vertex<I, V, E, ?> vertex) throws
+      IOException, InterruptedException {
+      // Convert vertex id to type OK.
+      OK outKey = convertToSequenceFileKey(vertex.getId());
+      // Convert vertex value to type OV.
+      OV outValue = convertToSequenceFileValue(vertex.getValue());
+      recordWriter.write(outKey, outValue);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      recordWriter.close(context);
+    }
+  }
+}