You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/16 01:12:19 UTC
svn commit: r1398568 - in /giraph/trunk: ./
giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/
giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/
giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/
giraph-...
Author: aching
Date: Mon Oct 15 23:12:19 2012
New Revision: 1398568
URL: http://svn.apache.org/viewvc?rev=1398568&view=rev
Log:
GIRAPH-371: Replace BspUtils in giraph-formats-contrib for speed. (aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Oct 15 23:12:19 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-371: Replace BspUtils in giraph-formats-contrib for
+ speed. (aching)
+
GIRAPH-369: bin/giraph broken (Nitay Joffe via ereisman)
GIRAPH-368: HBase Vertex I/O formats handle setConf() internally (bfem via ereisman)
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.io.accumulo;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.io.Writable;
@@ -71,9 +72,11 @@ public abstract class AccumuloVertexInpu
V extends Writable, E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
+ /** Giraph configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
/**
- * Used by subclasses to read key/value pairs.
- */
+ * Used by subclasses to read key/value pairs.
+ */
private final RecordReader<Key, Value> reader;
/** Context passed to initialize */
private TaskAttemptContext context;
@@ -86,20 +89,25 @@ public abstract class AccumuloVertexInpu
this.reader = reader;
}
- /**
- * initialize the reader.
- *
- * @param inputSplit Input split to be used for reading vertices.
- * @param context Context from the task.
- * @throws IOException
- * @throws InterruptedException
- */
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+ return configuration;
+ }
+ /**
+ * initialize the reader.
+ *
+ * @param inputSplit Input split to be used for reading vertices.
+ * @param context Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
public void initialize(InputSplit inputSplit,
TaskAttemptContext context)
throws IOException, InterruptedException {
reader.initialize(inputSplit, context);
this.context = context;
+ this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
}
/**
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/HBaseVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -17,6 +17,7 @@
*/
package org.apache.giraph.io.hbase;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.hbase.client.Result;
@@ -92,16 +93,16 @@ public abstract class HBaseVertexInputF
V extends Writable,
E extends Writable, M extends Writable>
implements VertexReader<I, V, E, M> {
-
- /**
- * reader instance
- */
+ /** Giraph configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ /** Reader instance */
private final RecordReader<ImmutableBytesWritable, Result> reader;
/** Context passed to initialize */
private TaskAttemptContext context;
/**
- * sets the base TableInputFOrmat and creates a record reader.
+ * Sets the base TableInputFormat and creates a record reader.
+ *
* @param split InputSplit
* @param context Context
* @throws IOException
@@ -112,6 +113,10 @@ public abstract class HBaseVertexInputF
this.reader = BASE_FORMAT.createRecordReader(split, context);
}
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+ return configuration;
+ }
+
/**
* initialize
*
@@ -126,6 +131,8 @@ public abstract class HBaseVertexInputF
InterruptedException {
reader.initialize(inputSplit, context);
this.context = context;
+ this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
}
/**
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java Mon Oct 15 23:12:19 2012
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
@@ -59,69 +59,74 @@ import org.apache.log4j.Logger;
@SuppressWarnings("rawtypes")
public abstract class HCatalogVertexInputFormat<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
+ I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable>
+ extends VertexInputFormat<I, V, E, M> {
/**
- * H catalog input format.
- */
+ * H catalog input format.
+ */
private HCatInputFormat hCatInputFormat = new HCatInputFormat();
@Override
public final List<InputSplit> getSplits(
- final JobContext context, final int numWorkers)
+ final JobContext context, final int numWorkers)
throws IOException, InterruptedException {
return hCatInputFormat.getSplits(context);
}
- /**
- * Abstract class that users should subclass
- * based on their specific vertex
- * input. HCatRecord can be parsed to get the
- * required data for implementing
- * getCurrentVertex(). If the vertex spans more
- * than one HCatRecord,
- * nextVertex() should be overwritten to handle that logic as well.
- */
+ /**
+ * Abstract class that users should subclass
+ * based on their specific vertex
+ * input. HCatRecord can be parsed to get the
+ * required data for implementing
+ * getCurrentVertex(). If the vertex spans more
+ * than one HCatRecord,
+ * nextVertex() should be overwritten to handle that logic as well.
+ */
protected abstract class HCatalogVertexReader implements
- VertexReader<I, V, E, M> {
-
+ VertexReader<I, V, E, M> {
+ /** Giraph configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
/** Internal HCatRecordReader. */
private RecordReader<WritableComparable,
- HCatRecord> hCatRecordReader;
-
+ HCatRecord> hCatRecordReader;
/** Context passed to initialize. */
private TaskAttemptContext context;
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConfiguration() {
+ return configuration;
+ }
+
/**
* Initialize with the HCatRecordReader.
*
* @param recordReader internal reader
*/
private void initialize(
- final RecordReader<
- WritableComparable, HCatRecord>
- recordReader) {
+ final RecordReader<
+ WritableComparable, HCatRecord>
+ recordReader) {
this.hCatRecordReader = recordReader;
}
@Override
public final void initialize(
- final InputSplit inputSplit,
- final TaskAttemptContext ctxt)
+ final InputSplit inputSplit,
+ final TaskAttemptContext ctxt)
throws IOException, InterruptedException {
hCatRecordReader.initialize(inputSplit, ctxt);
this.context = ctxt;
+ this.configuration = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
}
@Override
- public boolean nextVertex()
- throws IOException, InterruptedException {
- // Users can override this if desired,
- // and a vertex is bigger than
- // a single row.
+ public boolean nextVertex() throws IOException, InterruptedException {
+ // Users can override this if desired,
+ // and a vertex is bigger than
+ // a single row.
return hCatRecordReader.nextKeyValue();
}
@@ -131,222 +136,217 @@ public abstract class HCatalogVertexInpu
}
@Override
- public final float getProgress()
- throws IOException, InterruptedException {
+ public final float getProgress() throws IOException, InterruptedException {
return hCatRecordReader.getProgress();
}
/**
- * Get the record reader.
- * @return Record reader to be used for reading.
- */
+ * Get the record reader.
+ * @return Record reader to be used for reading.
+ */
protected final RecordReader<WritableComparable, HCatRecord>
getRecordReader() {
return hCatRecordReader;
}
- /**
- * Get the context.
- *
- *
- *
- * @return Context passed to initialize.
- */
+ /**
+ * Get the context.
+ *
+ *
+ *
+ * @return Context passed to initialize.
+ */
protected final TaskAttemptContext getContext() {
return context;
}
}
/**
- * create vertex writer instance.
- * @return HCatalogVertexReader
- */
+ * create vertex writer instance.
+ * @return HCatalogVertexReader
+ */
protected abstract HCatalogVertexReader createVertexReader();
@Override
public final VertexReader<I, V, E, M>
createVertexReader(final InputSplit split,
- final TaskAttemptContext context)
+ final TaskAttemptContext context)
throws IOException {
try {
HCatalogVertexReader reader = createVertexReader();
reader.initialize(hCatInputFormat.
- createRecordReader(split, context));
+ createRecordReader(split, context));
return reader;
} catch (InterruptedException e) {
throw new IllegalStateException(
- "createVertexReader: " +
- "Interrupted creating reader.", e);
+ "createVertexReader: " +
+ "Interrupted creating reader.", e);
}
}
- /**
- * HCatalogVertexReader for tables holding
- * complete vertex info within each
- * row.
- */
+ /**
+ * HCatalogVertexReader for tables holding
+ * complete vertex info within each
+ * row.
+ */
protected abstract class SingleRowHCatalogVertexReader
- extends HCatalogVertexReader {
+ extends HCatalogVertexReader {
/**
- * 1024 const.
- */
+ * 1024 const.
+ */
private static final int BYTE_CONST = 1024;
/**
- * logger
- */
+ * logger
+ */
private final Logger log =
- Logger.getLogger(SingleRowHCatalogVertexReader.class);
+ Logger.getLogger(SingleRowHCatalogVertexReader.class);
/**
- * record count.
- */
+ * record count.
+ */
private int recordCount = 0;
/**
- * modulus check counter.
- */
+ * modulus check counter.
+ */
private final int recordModLimit = 1000;
/**
- * get vertex id.
- * @param record hcat record
- * @return I id
- */
+ * get vertex id.
+ * @param record hcat record
+ * @return I id
+ */
protected abstract I getVertexId(HCatRecord record);
/**
- * get vertext value.
- * @param record hcat record
- * @return V value
- */
+ * get vertex value.
+ * @param record hcat record
+ * @return V value
+ */
protected abstract V getVertexValue(HCatRecord record);
/**
- * get edges.
- * @param record hcat record
- * @return Map edges
- */
+ * get edges.
+ * @param record hcat record
+ * @return Map edges
+ */
protected abstract Map<I, E> getEdges(HCatRecord record);
@Override
public final Vertex<I, V, E, M> getCurrentVertex()
throws IOException, InterruptedException {
HCatRecord record = getRecordReader().getCurrentValue();
- Vertex<I, V, E, M> vertex =
- BspUtils.createVertex(getContext()
- .getConfiguration());
+ Vertex<I, V, E, M> vertex = getConfiguration().createVertex();
vertex.initialize(getVertexId(record), getVertexValue(record),
- getEdges(record));
+ getEdges(record));
++recordCount;
if ((recordCount % recordModLimit) == 0) {
log.info("read " + recordCount + " records");
- // memory usage
+ // memory usage
Runtime runtime = Runtime.getRuntime();
double gb = BYTE_CONST *
- BYTE_CONST *
- BYTE_CONST;
+ BYTE_CONST *
+ BYTE_CONST;
log.info("Memory: " + (runtime.totalMemory() / gb) +
- "GB total = " +
- ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
- "GB used + " + (runtime.freeMemory() / gb) +
- "GB free, " + (runtime.maxMemory() / gb) + "GB max");
+ "GB total = " +
+ ((runtime.totalMemory() - runtime.freeMemory()) / gb) +
+ "GB used + " + (runtime.freeMemory() / gb) +
+ "GB free, " + (runtime.maxMemory() / gb) + "GB max");
}
return vertex;
}
}
/**
- * HCatalogVertexReader for tables
- * holding vertex info across multiple rows
- * sorted by vertex id column,
- * so that they appear consecutively to the
- * RecordReader.
- */
+ * HCatalogVertexReader for tables
+ * holding vertex info across multiple rows
+ * sorted by vertex id column,
+ * so that they appear consecutively to the
+ * RecordReader.
+ */
protected abstract class MultiRowHCatalogVertexReader extends
- HCatalogVertexReader {
+ HCatalogVertexReader {
/**
- * modulus check counter.
- */
+ * modulus check counter.
+ */
private static final int RECORD_MOD_LIMIT = 1000;
/**
- * logger
- */
+ * logger
+ */
private final Logger log =
- Logger.getLogger(MultiRowHCatalogVertexReader.class);
+ Logger.getLogger(MultiRowHCatalogVertexReader.class);
/**
- * current vertex id.
- */
+ * current vertex id.
+ */
private I currentVertexId = null;
/**
- * destination edge map.
- */
+ * destination edge map.
+ */
private Map<I, E> destEdgeMap = Maps.newHashMap();
/**
- * record for vertex.
- */
+ * record for vertex.
+ */
private List<HCatRecord> recordsForVertex = Lists.newArrayList();
/**
- * record count.
- */
+ * record count.
+ */
private int recordCount = 0;
/**
- * vertex.
- *
- */
+ * vertex.
+ *
+ */
private Vertex<I, V, E, M> vertex = null;
/**
- * get vertex id from record.
- *
- * @param record hcat
- * @return I vertex id
- */
+ * get vertex id from record.
+ *
+ * @param record hcat
+ * @return I vertex id
+ */
protected abstract I getVertexId(HCatRecord record);
/**
- * get vertex value from record.
- * @param records all vertex values
- * @return V iterable of record values
- */
+ * get vertex value from record.
+ * @param records all vertex values
+ * @return V iterable of record values
+ */
protected abstract V getVertexValue(
- Iterable<HCatRecord> records);
+ Iterable<HCatRecord> records);
/**
- * get target vertex id from record.
- *
- * @param record hcat
- * @return I vertex id of target.
- */
+ * get target vertex id from record.
+ *
+ * @param record hcat
+ * @return I vertex id of target.
+ */
protected abstract I getTargetVertexId(HCatRecord record);
/**
- * get edge value from record.
- *
- * @param record hcat.
- * @return E edge value.
- */
+ * get edge value from record.
+ *
+ * @param record hcat.
+ * @return E edge value.
+ */
protected abstract E getEdgeValue(HCatRecord record);
@Override
public final Vertex<I, V, E, M>
- getCurrentVertex()
- throws IOException, InterruptedException {
+ getCurrentVertex() throws IOException, InterruptedException {
return vertex;
}
@Override
- public boolean nextVertex()
- throws IOException, InterruptedException {
+ public boolean nextVertex() throws IOException, InterruptedException {
while (getRecordReader().nextKeyValue()) {
HCatRecord record =
- getRecordReader().getCurrentValue();
+ getRecordReader().getCurrentValue();
if (currentVertexId == null) {
currentVertexId = getVertexId(record);
}
if (currentVertexId.equals(getVertexId(record))) {
destEdgeMap.put(
- getTargetVertexId(record),
- getEdgeValue(record));
+ getTargetVertexId(record),
+ getEdgeValue(record));
recordsForVertex.add(record);
} else {
createCurrentVertex();
@@ -368,11 +368,10 @@ public abstract class HCatalogVertexInpu
}
/**
- * create current vertex.
- */
+ * create current vertex.
+ */
private void createCurrentVertex() {
- vertex = BspUtils.
- createVertex(getContext().getConfiguration());
+ vertex = getConfiguration().createVertex();
vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
destEdgeMap);
destEdgeMap.clear();
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java Mon Oct 15 23:12:19 2012
@@ -22,7 +22,6 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.VertexReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -80,8 +79,7 @@ public class AccumuloEdgeInputFormat
Key key = getRecordReader().getCurrentKey();
Value value = getRecordReader().getCurrentValue();
Vertex<Text, Text, Text, Text> vertex =
- BspUtils.<Text, Text, Text, Text>createVertex(
- getContext().getConfiguration());
+ getConfiguration().createVertex();
Text vertexId = key.getRow();
Map<Text, Text> edges = Maps.newHashMap();
String edge = new String(value.get());
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java?rev=1398568&r1=1398567&r2=1398568&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java Mon Oct 15 23:12:19 2012
@@ -20,15 +20,11 @@ package org.apache.giraph.io.hbase.edgem
import com.google.common.collect.Maps;
import org.apache.giraph.io.hbase.HBaseVertexInputFormat;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
@@ -45,7 +41,6 @@ public class TableEdgeInputFormat extend
private static final Logger log =
Logger.getLogger(TableEdgeInputFormat.class);
private static final Text uselessEdgeValue = new Text();
- private Configuration conf;
public VertexReader<Text, Text, Text, Text>
createVertexReader(InputSplit split,
@@ -83,8 +78,7 @@ public class TableEdgeInputFormat extend
throws IOException, InterruptedException {
Result row = getRecordReader().getCurrentValue();
Vertex<Text, Text, Text, Text> vertex =
- BspUtils.<Text, Text, Text, Text>
- createVertex(getContext().getConfiguration());
+ getConfiguration().createVertex();
Text vertexId = new Text(Bytes.toString(row.getRow()));
Map<Text, Text> edges = Maps.newHashMap();
String edge = Bytes.toString(row.getValue(CF, CHILDREN));