You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/21 01:00:28 UTC
svn commit: r1388258 [2/2] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/io/
src/test/java/org/apache/giraph/io/
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -18,8 +18,7 @@
package org.apache.giraph.io;
-import java.io.IOException;
-
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexOutputFormat;
import org.apache.giraph.graph.VertexWriter;
import org.apache.hadoop.io.Text;
@@ -31,6 +30,8 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import java.io.IOException;
+
/**
* Abstract class that users should subclass to use their own text based
* vertex output format.
@@ -43,38 +44,70 @@ import org.apache.hadoop.mapreduce.lib.o
public abstract class TextVertexOutputFormat<I extends WritableComparable,
V extends Writable, E extends Writable>
extends VertexOutputFormat<I, V, E> {
+
/** Uses the TextOutputFormat to do everything */
protected TextOutputFormat<Text, Text> textOutputFormat =
new TextOutputFormat<Text, Text>();
+ @Override
+ public void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException {
+ textOutputFormat.checkOutputSpecs(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return textOutputFormat.getOutputCommitter(context);
+ }
+
+ /**
+ * The factory method which produces the {@link TextVertexWriter} used by this
+ * output format.
+ *
+ * @param context
+ * the information about the task
+ * @return
+ * the text vertex writer to be used
+ */
+ @Override
+ public abstract TextVertexWriter createVertexWriter(TaskAttemptContext
+ context) throws IOException, InterruptedException;
+
/**
* Abstract class to be implemented by the user based on their specific
* vertex output. Easiest to ignore the key value separator and only use
* key instead.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
*/
- public abstract static class TextVertexWriter<I extends WritableComparable,
- V extends Writable, E extends Writable> implements VertexWriter<I, V, E> {
+ protected abstract class TextVertexWriter implements VertexWriter<I, V, E> {
+ /** Internal line record writer */
+ private RecordWriter<Text, Text> lineRecordWriter;
/** Context passed to initialize */
private TaskAttemptContext context;
- /** Internal line record writer */
- private final RecordWriter<Text, Text> lineRecordWriter;
+
+ @Override
+ public void initialize(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineRecordWriter = createLineRecordWriter(context);
+ this.context = context;
+ }
/**
- * Initialize with the LineRecordWriter.
+ * Create the line record writer. Override this to use a different
+ * underlying record writer (useful for testing).
*
- * @param lineRecordWriter Line record writer from TextOutputFormat
+ * @param context
+ * the context passed to initialize
+ * @return
+ * the record writer to be used
+ * @throws IOException
+ * exception that can be thrown during creation
+ * @throws InterruptedException
+ * exception that can be thrown during creation
*/
- public TextVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
- this.lineRecordWriter = lineRecordWriter;
- }
-
- @Override
- public void initialize(TaskAttemptContext context) throws IOException {
- this.context = context;
+ protected RecordWriter<Text, Text> createLineRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return textOutputFormat.getRecordWriter(context);
}
@Override
@@ -102,15 +135,31 @@ public abstract class TextVertexOutputFo
}
}
- @Override
- public void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException {
- textOutputFormat.checkOutputSpecs(context);
- }
+ /**
+ * Abstract class to be implemented by the user to write a line for each
+ * vertex.
+ */
+ protected abstract class TextVertexWriterToEachLine extends TextVertexWriter {
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return textOutputFormat.getOutputCommitter(context);
+ @SuppressWarnings("unchecked")
+ @Override
+ public final void writeVertex(Vertex vertex) throws
+ IOException, InterruptedException {
+ // Note we are writing line as key with null value
+ getRecordWriter().write(convertVertexToLine(vertex), null);
+ }
+
+ /**
+ * Writes a line for the given vertex.
+ *
+ * @param vertex
+ * the current vertex for writing
+ * @return the text line to be written
+ * @throws IOException
+ * exception that can be thrown while writing
+ */
+ protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+ throws IOException;
}
+
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestAdjacencyListTextVertexOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,7 +19,6 @@ package org.apache.giraph.io;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.AdjacencyListTextVertexOutputFormat.AdjacencyListVertexWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
@@ -38,7 +37,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-public class TestAdjacencyListTextVertexOutputFormat {
+public class TestAdjacencyListTextVertexOutputFormat extends
+ AdjacencyListTextVertexOutputFormat<Text, DoubleWritable, DoubleWritable> {
+
+ protected AdjacencyListTextVertexWriter createVertexWriter(
+ final RecordWriter<Text, Text> tw) {
+ AdjacencyListTextVertexWriter writer = new AdjacencyListTextVertexWriter() {
+ @Override
+ protected RecordWriter<Text, Text> createLineRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return tw;
+ }
+ };
+ return writer;
+ }
@Test
public void testVertexWithNoEdges() throws IOException, InterruptedException {
@@ -53,7 +65,7 @@ public class TestAdjacencyListTextVertex
when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
RecordWriter<Text, Text> tw = mock(RecordWriter.class);
- AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+ AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
writer.initialize(tac);
writer.writeVertex(vertex);
@@ -84,7 +96,7 @@ public class TestAdjacencyListTextVertex
when(vertex.getEdges()).thenReturn(cities);
RecordWriter<Text,Text> tw = mock(RecordWriter.class);
- AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+ AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
writer.initialize(tac);
writer.writeVertex(vertex);
@@ -97,7 +109,7 @@ public class TestAdjacencyListTextVertex
@Test
public void testWithDifferentDelimiter() throws IOException, InterruptedException {
Configuration conf = new Configuration();
- conf.set(AdjacencyListVertexWriter.LINE_TOKENIZE_VALUE, ":::");
+ conf.set(AdjacencyListTextVertexOutputFormat.LINE_TOKENIZE_VALUE, ":::");
TaskAttemptContext tac = mock(TaskAttemptContext.class);
when(tac.getConfiguration()).thenReturn(conf);
@@ -116,7 +128,7 @@ public class TestAdjacencyListTextVertex
when(vertex.getEdges()).thenReturn(cities);
RecordWriter<Text,Text> tw = mock(RecordWriter.class);
- AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
+ AdjacencyListTextVertexWriter writer = createVertexWriter(tw);
writer.initialize(tac);
writer.writeVertex(vertex);
@@ -125,4 +137,5 @@ public class TestAdjacencyListTextVertex
verify(tw).write(expected, null);
verify(vertex, times(1)).getEdges();
}
+
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestIdWithValueTextOutputFormat.java Thu Sep 20 23:00:27 2012
@@ -19,18 +19,16 @@
package org.apache.giraph.io;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Test;
import org.mockito.Matchers;
-import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.LINE_TOKENIZE_VALUE;
-import static org.apache.giraph.io.IdWithValueTextOutputFormat.IdWithValueVertexWriter.REVERSE_ID_AND_VALUE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -39,7 +37,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
-public class TestIdWithValueTextOutputFormat {
+public class TestIdWithValueTextOutputFormat extends
+ IdWithValueTextOutputFormat<Text, DoubleWritable, Writable> {
@Test
public void testHappyPath() throws IOException, InterruptedException {
Configuration conf = new Configuration();
@@ -79,8 +78,14 @@ public class TestIdWithValueTextOutputFo
// Create empty iterator == no edges
when(vertex.getEdges()).thenReturn(new ArrayList<Text>());
- RecordWriter<Text, Text> tw = mock(RecordWriter.class);
- IdWithValueVertexWriter writer = new IdWithValueVertexWriter(tw);
+ final RecordWriter<Text, Text> tw = mock(RecordWriter.class);
+ IdWithValueVertexWriter writer = new IdWithValueVertexWriter() {
+ @Override
+ protected RecordWriter<Text, Text> createLineRecordWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return tw;
+ }
+ };
writer.initialize(tac);
writer.writeVertex(vertex);
Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Before;
@@ -44,7 +45,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
-public class TestLongDoubleDoubleAdjacencyListVertexInputFormat {
+public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends
+ LongDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
private RecordReader<LongWritable, Text> rr;
private Configuration conf;
@@ -64,13 +66,30 @@ public class TestLongDoubleDoubleAdjacen
when(tac.getConfiguration()).thenReturn(conf);
}
+ protected TextVertexReader createVertexReader(
+ RecordReader<LongWritable, Text> rr) {
+ return createVertexReader(rr, null);
+ }
+
+ protected TextVertexReader createVertexReader(
+ final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+ return new LongDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+ @Override
+ protected RecordReader<LongWritable, Text> createLineRecordReader(
+ InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return rr;
+ }
+ };
+ }
+
@Test
public void testIndexMustHaveValue() throws IOException, InterruptedException {
String input = "123";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ TextVertexReader vr = createVertexReader(rr);
+
vr.initialize(null, tac);
@@ -88,8 +107,8 @@ public class TestLongDoubleDoubleAdjacen
String input = "99\t55.2\t100";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
- new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+ TextVertexReader vr = createVertexReader(rr);
+
vr.initialize(null, tac);
@@ -107,8 +126,8 @@ public class TestLongDoubleDoubleAdjacen
String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ TextVertexReader vr = createVertexReader(rr);
+
vr.initialize(null, tac);
@@ -129,9 +148,8 @@ public class TestLongDoubleDoubleAdjacen
String input = "12345:42.42:9999999:99.9";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
- LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+ TextVertexReader vr = createVertexReader(rr);
vr.initialize(null, tac);
assertTrue("Should have been able to read vertex", vr.nextVertex());
Modified: giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1388258&r1=1388257&r2=1388258&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Sep 20 23:00:27 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Before;
@@ -51,7 +52,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class TestTextDoubleDoubleAdjacencyListVertexInputFormat {
+public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends
+ TextDoubleDoubleAdjacencyListVertexInputFormat<BooleanWritable> {
private RecordReader<LongWritable, Text> rr;
private Configuration conf;
@@ -71,13 +73,29 @@ public class TestTextDoubleDoubleAdjacen
when(tac.getConfiguration()).thenReturn(conf);
}
+ protected TextVertexReader createVertexReader(
+ RecordReader<LongWritable, Text> rr) {
+ return createVertexReader(rr, null);
+ }
+
+ protected TextVertexReader createVertexReader(
+ final RecordReader<LongWritable, Text> rr, LineSanitizer lineSanitizer) {
+ return new TextDoubleDoubleAdjacencyListVertexReader(lineSanitizer) {
+ @Override
+ protected RecordReader<LongWritable, Text> createLineRecordReader(
+ InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return rr;
+ }
+ };
+ }
+
@Test
public void testIndexMustHaveValue() throws IOException, InterruptedException {
String input = "hi";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ TextVertexReader vr = createVertexReader(rr);
vr.initialize(null, tac);
@@ -95,8 +113,7 @@ public class TestTextDoubleDoubleAdjacen
String input = "index\t55.66\tindex2";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ TextVertexReader vr = createVertexReader(rr);
vr.initialize(null, tac);
try {
vr.nextVertex();
@@ -153,8 +170,7 @@ public class TestTextDoubleDoubleAdjacen
String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ TextVertexReader vr = createVertexReader(rr);
vr.initialize(null, tac);
assertTrue("Should have been able to add a vertex", vr.nextVertex());
@@ -172,8 +188,8 @@ public class TestTextDoubleDoubleAdjacen
public void testLineSanitizer() throws Exception {
String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
- AdjacencyListVertexReader.LineSanitizer toUpper =
- new AdjacencyListVertexReader.LineSanitizer() {
+ AdjacencyListTextVertexInputFormat.LineSanitizer toUpper =
+ new AdjacencyListTextVertexInputFormat.LineSanitizer() {
@Override
public String sanitize(String s) {
return s.toUpperCase();
@@ -181,8 +197,7 @@ public class TestTextDoubleDoubleAdjacen
};
when(rr.getCurrentValue()).thenReturn(new Text(input));
- TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr, toUpper);
+ TextVertexReader vr = createVertexReader(rr, toUpper);
vr.initialize(null, tac);
assertTrue("Should have been able to read vertex", vr.nextVertex());
@@ -203,9 +218,8 @@ public class TestTextDoubleDoubleAdjacen
String input = "alpha:42:beta:99";
when(rr.getCurrentValue()).thenReturn(new Text(input));
- conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
- TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr =
- new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
+ conf.set(AdjacencyListTextVertexInputFormat.LINE_TOKENIZE_VALUE, ":");
+ TextVertexReader vr = createVertexReader(rr);
vr.initialize(null, tac);
assertTrue("Should have been able to read vertex", vr.nextVertex());