You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/05 23:20:32 UTC
git commit: GIRAPH-493: Remove EdgeWithSource (nitay)
Updated Branches:
refs/heads/trunk f34e9b7c8 -> 01b353334
GIRAPH-493: Remove EdgeWithSource (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/01b35333
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/01b35333
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/01b35333
Branch: refs/heads/trunk
Commit: 01b353334417fe9d511c239d013dedd822c8235a
Parents: f34e9b7
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jan 30 00:46:01 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Feb 5 17:20:26 2013 -0500
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/graph/EdgeWithSource.java | 65 ---------------
.../main/java/org/apache/giraph/io/EdgeReader.java | 16 +++-
.../io/formats/PseudoRandomEdgeInputFormat.java | 20 +++--
.../giraph/io/formats/TextEdgeInputFormat.java | 49 +++++++++---
.../giraph/worker/EdgeInputSplitsCallable.java | 17 ++--
.../io/hcatalog/HCatalogEdgeInputFormat.java | 28 ++++--
7 files changed, 92 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b18c417..cc0ca34 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-493: Remove EdgeWithSource (nitay)
+
GIRAPH-429: Number of input split threads set to 1 less than necessary (majakabiljo)
GIRAPH-498: We should check input splits status from zookeeeper once per worker,
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
deleted file mode 100644
index 84232c9..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A pair of source vertex id and Edge object (that is,
- * all the information about an edge).
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- */
-public class EdgeWithSource<I extends WritableComparable, E extends Writable> {
- /** Source vertex id. */
- private final I sourceVertexId;
- /** Edge. */
- private final Edge<I, E> edge;
-
- /**
- * Constructor.
- *
- * @param sourceVertexId Source vertex id
- * @param edge Edge
- */
- public EdgeWithSource(I sourceVertexId, Edge<I, E> edge) {
- this.sourceVertexId = sourceVertexId;
- this.edge = edge;
- }
-
- /**
- * Get the source vertex id.
- *
- * @return Source vertex id.
- */
- public I getSourceVertexId() {
- return sourceVertexId;
- }
-
- /**
- * Get the edge object.
- *
- * @return The edge.
- */
- public Edge<I, E> getEdge() {
- return edge;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 9642fab..ed6fad1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -18,7 +18,7 @@
package org.apache.giraph.io;
-import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -48,6 +48,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
IOException, InterruptedException;
/**
+ * Read the next edge.
*
* @return false iff there are no more edges
* @throws IOException
@@ -56,6 +57,16 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
boolean nextEdge() throws IOException, InterruptedException;
/**
+ * Get the current edge source id.
+ *
+ * @return Current edge source id which has been read.
+ * nextEdge() should be called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ I getCurrentSourceId() throws IOException, InterruptedException;
+
+ /**
* Get the current edge.
*
* @return the current edge which has been read.
@@ -63,8 +74,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
* @throws IOException
* @throws InterruptedException
*/
- EdgeWithSource<I, E> getCurrentEdge() throws IOException,
- InterruptedException;
+ Edge<I, E> getCurrentEdge() throws IOException, InterruptedException;
/**
* Close this {@link EdgeReader} to future operations.
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 9196f18..116f45e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -18,13 +18,12 @@
package org.apache.giraph.io.formats;
-import com.google.common.collect.Sets;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,6 +31,8 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import com.google.common.collect.Sets;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -149,8 +150,8 @@ public class PseudoRandomEdgeInputFormat
}
@Override
- public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge()
- throws IOException, InterruptedException {
+ public LongWritable getCurrentSourceId() throws IOException,
+ InterruptedException {
if (currentVertexEdgesRead == edgesPerVertex) {
++verticesRead;
currentVertexId = new LongWritable(-1);
@@ -165,7 +166,12 @@ public class PseudoRandomEdgeInputFormat
random.setSeed(currentVertexId.get());
currentVertexDestVertices.clear();
}
+ return currentVertexId;
+ }
+ @Override
+ public Edge<LongWritable, DoubleWritable> getCurrentEdge()
+ throws IOException, InterruptedException {
LongWritable destVertexId;
do {
destVertexId =
@@ -178,11 +184,9 @@ public class PseudoRandomEdgeInputFormat
LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
"" + destVertexId + ")");
}
- return new EdgeWithSource<LongWritable, DoubleWritable>(
- currentVertexId,
- new DefaultEdge<LongWritable, DoubleWritable>(
+ return new DefaultEdge<LongWritable, DoubleWritable>(
destVertexId,
- new DoubleWritable(random.nextDouble())));
+ new DoubleWritable(random.nextDouble()));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index 45ea61a..a8ebfda 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -20,9 +20,9 @@ package org.apache.giraph.io.formats;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -152,14 +152,19 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
*/
protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
@Override
- public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ public final I getCurrentSourceId() throws IOException,
+ InterruptedException {
+ Text line = getRecordReader().getCurrentValue();
+ return getSourceVertexId(line);
+ }
+
+ @Override
+ public final Edge<I, E> getCurrentEdge() throws IOException,
InterruptedException {
Text line = getRecordReader().getCurrentValue();
- I sourceVertexId = getSourceVertexId(line);
I targetVertexId = getTargetVertexId(line);
E edgeValue = getValue(line);
- return new EdgeWithSource<I, E>(sourceVertexId,
- new DefaultEdge<I, E>(targetVertexId, edgeValue));
+ return new DefaultEdge<I, E>(targetVertexId, edgeValue);
}
@Override
@@ -214,20 +219,42 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
*/
protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
TextEdgeReader {
+ /** Generic type holding processed line */
+ private T processedLine;
+
@Override
- public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ T processed = processCurrentLine();
+ return getSourceVertexId(processed);
+ }
+
+ @Override
+ public final Edge<I, E> getCurrentEdge() throws IOException,
InterruptedException {
- Text line = getRecordReader().getCurrentValue();
- T processed = preprocessLine(line);
- I sourceVertexId = getSourceVertexId(processed);
+ T processed = processCurrentLine();
I targetVertexId = getTargetVertexId(processed);
E edgeValue = getValue(processed);
- return new EdgeWithSource<I, E>(sourceVertexId,
- new DefaultEdge<I, E>(targetVertexId, edgeValue));
+ return new DefaultEdge<I, E>(targetVertexId, edgeValue);
+ }
+
+ /**
+ * Process the current line to the user's type.
+ *
+ * @return T processed line
+ * @throws IOException on I/O error
+ * @throws InterruptedException on interruption
+ */
+ private T processCurrentLine() throws IOException, InterruptedException {
+ if (processedLine == null) {
+ Text line = getRecordReader().getCurrentValue();
+ processedLine = preprocessLine(line);
+ }
+ return processedLine;
}
@Override
public final boolean nextEdge() throws IOException, InterruptedException {
+ processedLine = null;
return getRecordReader().nextKeyValue();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 80c341c..3e2dc66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -19,11 +19,11 @@
package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.utils.LoggerUtils;
@@ -114,25 +114,26 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
edgeReader.initialize(inputSplit, context);
long inputSplitEdgesLoaded = 0;
while (edgeReader.nextEdge()) {
- EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
- if (readerEdge.getSourceVertexId() == null) {
+ I sourceId = edgeReader.getCurrentSourceId();
+ Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
+ if (sourceId == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a source vertex id! - " + readerEdge);
}
- if (readerEdge.getEdge().getTargetVertexId() == null) {
+ if (readerEdge.getTargetVertexId() == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a target vertex id! - " + readerEdge);
}
- if (readerEdge.getEdge().getValue() == null) {
+ if (readerEdge.getValue() == null) {
throw new IllegalArgumentException(
"readInputSplit: Edge reader returned an edge " +
"without a value! - " + readerEdge);
}
graphState.getWorkerClientRequestProcessor().addEdgeRequest(
- readerEdge.getSourceVertexId(), readerEdge.getEdge());
+ sourceId, readerEdge);
context.progress(); // do this before potential data transfer
++inputSplitEdgesLoaded;
http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
index c92cc34..fe0ddd5 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -19,8 +19,8 @@
package org.apache.giraph.io.hcatalog;
import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeNoValue;
-import org.apache.giraph.graph.EdgeWithSource;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.hadoop.io.NullWritable;
@@ -193,13 +193,17 @@ public abstract class HCatalogEdgeInputFormat<
protected abstract E getEdgeValue(HCatRecord record);
@Override
- public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ return getSourceVertexId(record);
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() throws IOException,
InterruptedException {
HCatRecord record = getRecordReader().getCurrentValue();
- return new EdgeWithSource<I, E>(
- getSourceVertexId(record),
- new DefaultEdge<I, E>(getTargetVertexId(record),
- getEdgeValue(record)));
+ return new DefaultEdge<I, E>(getTargetVertexId(record),
+ getEdgeValue(record));
}
}
@@ -236,12 +240,16 @@ public abstract class HCatalogEdgeInputFormat<
protected abstract I getTargetVertexId(HCatRecord record);
@Override
- public EdgeWithSource<I, NullWritable> getCurrentEdge() throws IOException,
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ HCatRecord record = getRecordReader().getCurrentValue();
+ return getSourceVertexId(record);
+ }
+
+ @Override
+ public Edge<I, NullWritable> getCurrentEdge() throws IOException,
InterruptedException {
HCatRecord record = getRecordReader().getCurrentValue();
- return new EdgeWithSource<I, NullWritable>(
- getSourceVertexId(record),
- new EdgeNoValue<I>(getTargetVertexId(record)));
+ return new EdgeNoValue<I>(getTargetVertexId(record));
}
}
}