You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 01:00:28 UTC

svn commit: r1388258 [2/2] - in /giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/io/ src/test/java/org/apache/giraph/io/

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,8 +18,7 @@
 
 package org.apache.giraph.io;
 
-import java.io.IOException;
-
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexWriter;
 import org.apache.hadoop.io.Text;
@@ -31,6 +30,8 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
+import java.io.IOException;
+
 /**
  * Abstract class that users should subclass to use their own text based
  * vertex output format.
@@ -43,38 +44,70 @@ import org.apache.hadoop.mapreduce.lib.o
 public abstract class TextVertexOutputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends VertexOutputFormat<I, V, E> {
+
   /** Uses the TextOutputFormat to do everything */
   protected TextOutputFormat<Text, Text> textOutputFormat =
       new TextOutputFormat<Text, Text>();
 
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    textOutputFormat.checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return textOutputFormat.getOutputCommitter(context);
+  }
+
+  /**
+   * The factory method which produces the {@link TextVertexWriter} used by this
+   * output format.
+   *
+   * @param context
+   *          the information about the task
+   * @return
+   *         the text vertex writer to be used
+   */
+  @Override
+  public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
+      context) throws IOException, InterruptedException;
+
   /**
    * Abstract class to be implemented by the user based on their specific
    * vertex output.  Easiest to ignore the key value separator and only use
    * key instead.
-   *
-   * @param <I> Vertex index value
-   * @param <V> Vertex value
-   * @param <E> Edge value
    */
-  public abstract static class TextVertexWriter<I extends WritableComparable,
-      V extends Writable, E extends Writable> implements VertexWriter<I, V, E> {
+  protected abstract class TextVertexWriter implements VertexWriter<I, V, E> {
+    /** Internal line record writer */
+    private RecordWriter<Text, Text> lineRecordWriter;
     /** Context passed to initialize */
     private TaskAttemptContext context;
-    /** Internal line record writer */
-    private final RecordWriter<Text, Text> lineRecordWriter;
+
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+           InterruptedException {
+      lineRecordWriter = createLineRecordWriter(context);
+      this.context = context;
+    }
 
     /**
-     * Initialize with the LineRecordWriter.
+     * Create the line record writer. Override this to use a different
+     * underlying record writer (useful for testing).
      *
-     * @param lineRecordWriter Line record writer from TextOutputFormat
+     * @param context
+     *          the context passed to initialize
+     * @return
+     *         the record writer to be used
+     * @throws IOException
+     *           exception that can be thrown during creation
+     * @throws InterruptedException
+     *           exception that can be thrown during creation
      */
-    public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
-      this.lineRecordWriter = lineRecordWriter;
-    }
-
-    @Override
-    public void initialize(TaskAttemptContext context) throws IOException {
-      this.context = context;
+    protected RecordWriter<Text, Text> createLineRecordWriter(
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return textOutputFormat.getRecordWriter(context);
     }
 
     @Override
@@ -102,15 +135,31 @@ public abstract class TextVertexOutputFo
     }
   }
 
-  @Override
-  public void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException {
-    textOutputFormat.checkOutputSpecs(context);
-  }
+  /**
+   * Abstract class to be implemented by the user to write a line for each
+   * vertex.
+   */
+  protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
 
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-    throws IOException, InterruptedException {
-    return textOutputFormat.getOutputCommitter(context);
+    @SuppressWarnings("unchecked")
+    @Override
+    public final void writeVertex(Vertex vertex) throws
+      IOException, InterruptedException {
+      // Note we are writing line as key with null value
+      getRecordWriter().write(convertVertexToLine(vertex), null);
+    }
+
+    /**
+     * Writes a line for the given vertex.
+     *
+     * @param vertex
+     *          the current vertex for writing
+     * @return the text line to be written
+     * @throws IOException
+     *           exception that can be thrown while writing
+     */
+    protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+      throws IOException;
   }
+
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,7 +19,6 @@ package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.AdjacencyListTextVertexOutputFormat.AdjacencyListVertexWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
@@ -38,7 +37,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 
-public class TestAdjacencyListTextVertexOutputFormat {
+public class TestAdjacencyListTextVertexOutputFormat extends
+    AdjacencyListTextVertexOutputFormat<Text, DoubleWritable, DoubleWritable> {
+
+  protected AdjacencyListTextVertexWriter createVertexWriter(
+      final RecordWriter<Text, Text> tw) {
+    AdjacencyListTextVertexWriter writer = new AdjacencyListTextVertexWriter() {
+      @Override
+      protected RecordWriter<Text, Text> createLineRecordWriter(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        return tw;
+      }
+    };
+    return writer;
+  }
 
   @Test
   public void testVertexWithNoEdges() throws IOException, InterruptedException {
@@ -53,7 +65,7 @@ public class TestAdjacencyListTextVertex
     when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
 
     RecordWriter<Text, Text> tw = mock(RecordWriter.class);
-    AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
     writer.initialize(tac);
     writer.writeVertex(vertex);
 
@@ -84,7 +96,7 @@ public class TestAdjacencyListTextVertex
     when(vertex.getEdges()).thenReturn(cities);
 
     RecordWriter<Text,Text> tw = mock(RecordWriter.class);
-    AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
     writer.initialize(tac);
     writer.writeVertex(vertex);
 
@@ -97,7 +109,7 @@ public class TestAdjacencyListTextVertex
   @Test
   public void testWithDifferentDelimiter() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
-    conf.set(AdjacencyListVertexWriter.LINE_TOKENIZE_VALUE, ":::");
+    conf.set(AdjacencyListTextVertexOutputFormat.LINE_TOKENIZE_VALUE, ":::");
     TaskAttemptContext tac = mock(TaskAttemptContext.class);
     when(tac.getConfiguration()).thenReturn(conf);
 
@@ -116,7 +128,7 @@ public class TestAdjacencyListTextVertex
     when(vertex.getEdges()).thenReturn(cities);
 
     RecordWriter<Text,Text> tw = mock(RecordWriter.class);
-    AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+    AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
     writer.initialize(tac);
     writer.writeVertex(vertex);
 
@@ -125,4 +137,5 @@ public class TestAdjacencyListTextVertex
     verify(tw).write(expected, null);
     verify(vertex, times(1)).getEdges();
   }
+
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,18 +19,16 @@
 package org.apache.giraph.io;
 
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Test;
 import org.mockito.Matchers;
 
-import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.LINE_TOKENIZE_VALUE;
-import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.REVERSE_ID_AND_VALUE;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -39,7 +37,8 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 
-public class TestIdWithValueTextOutputFormat {
+public class TestIdWithValueTextOutputFormat extends
+    IdWithValueTextOutputFormat<Text, DoubleWritable, Writable> {
   @Test
   public void testHappyPath() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
@@ -79,8 +78,14 @@ public class TestIdWithValueTextOutputFo
     // Create empty iterator == no edges
     when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
 
-    RecordWriter<Text, Text> tw = mock(RecordWriter.class);
-    IdWithValueVertexWriter writer = new IdWithValueVertexWriter(tw);
+    final RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+    IdWithValueVertexWriter writer = new IdWithValueVertexWriter() {
+      @Override
+      protected RecordWriter<Text, Text> createLineRecordWriter(
+          TaskAttemptContext context) throws IOException, InterruptedException {
+        return tw;
+      }
+    };
     writer.initialize(tac);
     writer.writeVertex(vertex);
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Before;
@@ -44,7 +45,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
-public class TestLongDoubleDoubleAdjacencyListVertexInputFormat {
+public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends
+    LongDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
 
   private RecordReader<LongWritable, Text> rr;
   private Configuration conf;
@@ -64,13 +66,30 @@ public class TestLongDoubleDoubleAdjacen
     when(tac.getConfiguration()).thenReturn(conf);
   }
 
+  protected TextVertexReader createVertexReader(
+       RecordReader<LongWritable, Text> rr) {
+    return createVertexReader(rr, null);
+  }
+
+  protected TextVertexReader createVertexReader(
+      final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+    return new LongDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+      @Override
+      protected RecordReader<LongWritable, Text> createLineRecordReader(
+          InputSplit inputSplit, TaskAttemptContext context)
+          throws IOException, InterruptedException {
+        return rr;
+      }
+    };
+  }
+
   @Test
   public void testIndexMustHaveValue() throws IOException, InterruptedException {
     String input = "123";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    TextVertexReader vr = createVertexReader(rr);
+
 
     vr.initialize(null, tac);
 
@@ -88,8 +107,8 @@ public class TestLongDoubleDoubleAdjacen
     String input = "99\t55.2\t100";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    TextVertexReader vr = createVertexReader(rr);
+
 
     vr.initialize(null, tac);
 
@@ -107,8 +126,8 @@ public class TestLongDoubleDoubleAdjacen
     String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    TextVertexReader vr = createVertexReader(rr);
+
 
     vr.initialize(null, tac);
 
@@ -129,9 +148,8 @@ public class TestLongDoubleDoubleAdjacen
     String input = "12345:42.42:9999999:99.9";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+    TextVertexReader vr = createVertexReader(rr);
 
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());

Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.junit.Before;
@@ -51,7 +52,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class TestTextDoubleDoubleAdjacencyListVertexInputFormat {
+public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
+    TextDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
 
   private RecordReader<LongWritable, Text> rr;
   private Configuration conf;
@@ -71,13 +73,29 @@ public class TestTextDoubleDoubleAdjacen
     when(tac.getConfiguration()).thenReturn(conf);
   }
 
+  protected TextVertexReader createVertexReader(
+       RecordReader<LongWritable, Text> rr) {
+    return createVertexReader(rr, null);
+  }
+
+  protected TextVertexReader createVertexReader(
+      final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+    return new TextDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+      @Override
+      protected RecordReader<LongWritable, Text> createLineRecordReader(
+          InputSplit inputSplit, TaskAttemptContext context)
+          throws IOException, InterruptedException {
+        return rr;
+      }
+    };
+  }
+
   @Test
   public void testIndexMustHaveValue() throws IOException, InterruptedException {
     String input = "hi";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    TextVertexReader vr = createVertexReader(rr);
 
     vr.initialize(null, tac);
 
@@ -95,8 +113,7 @@ public class TestTextDoubleDoubleAdjacen
     String input = "index\t55.66\tindex2";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    TextVertexReader vr = createVertexReader(rr);
     vr.initialize(null, tac);
     try {
       vr.nextVertex();
@@ -153,8 +170,7 @@ public class TestTextDoubleDoubleAdjacen
     String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    TextVertexReader vr = createVertexReader(rr);
 
     vr.initialize(null, tac);
     assertTrue("Should have been able to add a vertex", vr.nextVertex());
@@ -172,8 +188,8 @@ public class TestTextDoubleDoubleAdjacen
   public void testLineSanitizer() throws Exception {
     String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
 
-    AdjacencyListVertexReader.LineSanitizer toUpper =
-        new AdjacencyListVertexReader.LineSanitizer() {
+    AdjacencyListTextVertexInputFormat.LineSanitizer toUpper =
+        new AdjacencyListTextVertexInputFormat.LineSanitizer() {
       @Override
       public String sanitize(String s) {
         return s.toUpperCase();
@@ -181,8 +197,7 @@ public class TestTextDoubleDoubleAdjacen
     };
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr, toUpper);
+    TextVertexReader vr = createVertexReader(rr, toUpper);
 
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());
@@ -203,9 +218,8 @@ public class TestTextDoubleDoubleAdjacen
     String input = "alpha:42:beta:99";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+    conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+    TextVertexReader vr = createVertexReader(rr);
 
     vr.initialize(null, tac);
     assertTrue("Should have been able to read vertex", vr.nextVertex());