You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/05 23:20:32 UTC

git commit: GIRAPH-493: Remove EdgeWithSource (nitay)

Updated Branches:
  refs/heads/trunk f34e9b7c8 -> 01b353334


GIRAPH-493: Remove EdgeWithSource (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/01b35333
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/01b35333
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/01b35333

Branch: refs/heads/trunk
Commit: 01b353334417fe9d511c239d013dedd822c8235a
Parents: f34e9b7
Author: Nitay Joffe <ni...@apache.org>
Authored: Wed Jan 30 00:46:01 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Feb 5 17:20:26 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/graph/EdgeWithSource.java    |   65 ---------------
 .../main/java/org/apache/giraph/io/EdgeReader.java |   16 +++-
 .../io/formats/PseudoRandomEdgeInputFormat.java    |   20 +++--
 .../giraph/io/formats/TextEdgeInputFormat.java     |   49 +++++++++---
 .../giraph/worker/EdgeInputSplitsCallable.java     |   17 ++--
 .../io/hcatalog/HCatalogEdgeInputFormat.java       |   28 ++++--
 7 files changed, 92 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b18c417..cc0ca34 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-493: Remove EdgeWithSource (nitay)
+
   GIRAPH-429: Number of input split threads set to 1 less than necessary (majakabiljo)
   
   GIRAPH-498: We should check input splits status from zookeeeper once per worker, 

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
deleted file mode 100644
index 84232c9..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeWithSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A pair of source vertex id and Edge object (that is,
- * all the information about an edge).
- *
- * @param <I> Vertex id
- * @param <E> Edge data
- */
-public class EdgeWithSource<I extends WritableComparable, E extends Writable> {
-  /** Source vertex id. */
-  private final I sourceVertexId;
-  /** Edge. */
-  private final Edge<I, E> edge;
-
-  /**
-   * Constructor.
-   *
-   * @param sourceVertexId Source vertex id
-   * @param edge Edge
-   */
-  public EdgeWithSource(I sourceVertexId, Edge<I, E> edge) {
-    this.sourceVertexId = sourceVertexId;
-    this.edge = edge;
-  }
-
-  /**
-   * Get the source vertex id.
-   *
-   * @return Source vertex id.
-   */
-  public I getSourceVertexId() {
-    return sourceVertexId;
-  }
-
-  /**
-   * Get the edge object.
-   *
-   * @return The edge.
-   */
-  public Edge<I, E> getEdge() {
-    return edge;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 9642fab..ed6fad1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.io;
 
-import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -48,6 +48,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
       IOException, InterruptedException;
 
   /**
+   * Read the next edge.
    *
    * @return false iff there are no more edges
    * @throws IOException
@@ -56,6 +57,16 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
   boolean nextEdge() throws IOException, InterruptedException;
 
   /**
+   * Get the current edge source id.
+   *
+   * @return Current edge source id which has been read.
+   *         nextEdge() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  I getCurrentSourceId() throws IOException, InterruptedException;
+
+  /**
    * Get the current edge.
    *
    * @return the current edge which has been read.
@@ -63,8 +74,7 @@ public interface EdgeReader<I extends WritableComparable, E extends Writable> {
    * @throws IOException
    * @throws InterruptedException
    */
-  EdgeWithSource<I, E> getCurrentEdge() throws IOException,
-      InterruptedException;
+  Edge<I, E> getCurrentEdge() throws IOException, InterruptedException;
 
   /**
    * Close this {@link EdgeReader} to future operations.

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 9196f18..116f45e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -18,13 +18,12 @@
 
 package org.apache.giraph.io.formats;
 
-import com.google.common.collect.Sets;
 import org.apache.giraph.bsp.BspInputSplit;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -32,6 +31,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -149,8 +150,8 @@ public class PseudoRandomEdgeInputFormat
     }
 
     @Override
-    public EdgeWithSource<LongWritable, DoubleWritable> getCurrentEdge()
-      throws IOException, InterruptedException {
+    public LongWritable getCurrentSourceId() throws IOException,
+        InterruptedException {
       if (currentVertexEdgesRead == edgesPerVertex) {
         ++verticesRead;
         currentVertexId = new LongWritable(-1);
@@ -165,7 +166,12 @@ public class PseudoRandomEdgeInputFormat
         random.setSeed(currentVertexId.get());
         currentVertexDestVertices.clear();
       }
+      return currentVertexId;
+    }
 
+    @Override
+    public Edge<LongWritable, DoubleWritable> getCurrentEdge()
+      throws IOException, InterruptedException {
       LongWritable destVertexId;
       do {
         destVertexId =
@@ -178,11 +184,9 @@ public class PseudoRandomEdgeInputFormat
         LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
             "" + destVertexId + ")");
       }
-      return new EdgeWithSource<LongWritable, DoubleWritable>(
-          currentVertexId,
-          new DefaultEdge<LongWritable, DoubleWritable>(
+      return new DefaultEdge<LongWritable, DoubleWritable>(
               destVertexId,
-              new DoubleWritable(random.nextDouble())));
+              new DoubleWritable(random.nextDouble()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index 45ea61a..a8ebfda 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -20,9 +20,9 @@ package org.apache.giraph.io.formats;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -152,14 +152,19 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
    */
   protected abstract class TextEdgeReaderFromEachLine extends TextEdgeReader {
     @Override
-    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+    public final I getCurrentSourceId() throws IOException,
+        InterruptedException {
+      Text line = getRecordReader().getCurrentValue();
+      return getSourceVertexId(line);
+    }
+
+    @Override
+    public final Edge<I, E> getCurrentEdge() throws IOException,
         InterruptedException {
       Text line = getRecordReader().getCurrentValue();
-      I sourceVertexId = getSourceVertexId(line);
       I targetVertexId = getTargetVertexId(line);
       E edgeValue = getValue(line);
-      return new EdgeWithSource<I, E>(sourceVertexId,
-          new DefaultEdge<I, E>(targetVertexId, edgeValue));
+      return new DefaultEdge<I, E>(targetVertexId, edgeValue);
     }
 
     @Override
@@ -214,20 +219,42 @@ public abstract class TextEdgeInputFormat<I extends WritableComparable,
    */
   protected abstract class TextEdgeReaderFromEachLineProcessed<T> extends
       TextEdgeReader {
+    /** Generic type holding processed line */
+    private T processedLine;
+
     @Override
-    public final EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+    public I getCurrentSourceId() throws IOException, InterruptedException {
+      T processed = processCurrentLine();
+      return getSourceVertexId(processed);
+    }
+
+    @Override
+    public final Edge<I, E> getCurrentEdge() throws IOException,
         InterruptedException {
-      Text line = getRecordReader().getCurrentValue();
-      T processed = preprocessLine(line);
-      I sourceVertexId = getSourceVertexId(processed);
+      T processed = processCurrentLine();
       I targetVertexId = getTargetVertexId(processed);
       E edgeValue = getValue(processed);
-      return new EdgeWithSource<I, E>(sourceVertexId,
-          new DefaultEdge<I, E>(targetVertexId, edgeValue));
+      return new DefaultEdge<I, E>(targetVertexId, edgeValue);
+    }
+
+    /**
+     * Process the current line to the user's type.
+     *
+     * @return T processed line
+     * @throws IOException on I/O error
+     * @throws InterruptedException on interruption
+     */
+    private T processCurrentLine() throws IOException, InterruptedException {
+      if (processedLine == null) {
+        Text line = getRecordReader().getCurrentValue();
+        processedLine = preprocessLine(line);
+      }
+      return processedLine;
     }
 
     @Override
     public final boolean nextEdge() throws IOException, InterruptedException {
+      processedLine = null;
       return getRecordReader().nextKeyValue();
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 80c341c..3e2dc66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -19,11 +19,11 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.graph.EdgeWithSource;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.utils.LoggerUtils;
@@ -114,25 +114,26 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     edgeReader.initialize(inputSplit, context);
     long inputSplitEdgesLoaded = 0;
     while (edgeReader.nextEdge()) {
-      EdgeWithSource<I, E> readerEdge = edgeReader.getCurrentEdge();
-      if (readerEdge.getSourceVertexId() == null) {
+      I sourceId = edgeReader.getCurrentSourceId();
+      Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
+      if (sourceId == null) {
         throw new IllegalArgumentException(
             "readInputSplit: Edge reader returned an edge " +
                 "without a source vertex id!  - " + readerEdge);
       }
-      if (readerEdge.getEdge().getTargetVertexId() == null) {
+      if (readerEdge.getTargetVertexId() == null) {
         throw new IllegalArgumentException(
             "readInputSplit: Edge reader returned an edge " +
                 "without a target vertex id!  - " + readerEdge);
       }
-      if (readerEdge.getEdge().getValue() == null) {
+      if (readerEdge.getValue() == null) {
         throw new IllegalArgumentException(
             "readInputSplit: Edge reader returned an edge " +
                 "without a value!  - " + readerEdge);
       }
 
       graphState.getWorkerClientRequestProcessor().addEdgeRequest(
-          readerEdge.getSourceVertexId(), readerEdge.getEdge());
+          sourceId, readerEdge);
       context.progress(); // do this before potential data transfer
       ++inputSplitEdgesLoaded;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/01b35333/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
index c92cc34..fe0ddd5 100644
--- a/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
+++ b/giraph-hcatalog/src/main/java/org/apache/giraph/io/hcatalog/HCatalogEdgeInputFormat.java
@@ -19,8 +19,8 @@
 package org.apache.giraph.io.hcatalog;
 
 import org.apache.giraph.graph.DefaultEdge;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeNoValue;
-import org.apache.giraph.graph.EdgeWithSource;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.hadoop.io.NullWritable;
@@ -193,13 +193,17 @@ public abstract class HCatalogEdgeInputFormat<
     protected abstract E getEdgeValue(HCatRecord record);
 
     @Override
-    public EdgeWithSource<I, E> getCurrentEdge() throws IOException,
+    public I getCurrentSourceId() throws IOException, InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      return getSourceVertexId(record);
+    }
+
+    @Override
+    public Edge<I, E> getCurrentEdge() throws IOException,
         InterruptedException {
       HCatRecord record = getRecordReader().getCurrentValue();
-      return new EdgeWithSource<I, E>(
-          getSourceVertexId(record),
-          new DefaultEdge<I, E>(getTargetVertexId(record),
-              getEdgeValue(record)));
+      return new DefaultEdge<I, E>(getTargetVertexId(record),
+          getEdgeValue(record));
     }
   }
 
@@ -236,12 +240,16 @@ public abstract class HCatalogEdgeInputFormat<
     protected abstract I getTargetVertexId(HCatRecord record);
 
     @Override
-    public EdgeWithSource<I, NullWritable> getCurrentEdge() throws IOException,
+    public I getCurrentSourceId() throws IOException, InterruptedException {
+      HCatRecord record = getRecordReader().getCurrentValue();
+      return getSourceVertexId(record);
+    }
+
+    @Override
+    public Edge<I, NullWritable> getCurrentEdge() throws IOException,
         InterruptedException {
       HCatRecord record = getRecordReader().getCurrentValue();
-      return new EdgeWithSource<I, NullWritable>(
-          getSourceVertexId(record),
-          new EdgeNoValue<I>(getTargetVertexId(record)));
+      return new EdgeNoValue<I>(getTargetVertexId(record));
     }
   }
 }