You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/06 22:30:46 UTC

git commit: updated refs/heads/trunk to d8baf4b

Updated Branches:
  refs/heads/trunk b88292dbf -> d8baf4b80


GIRAPH-656: Input from multiple tables doesn't work with multithreading (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d8baf4b8
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d8baf4b8
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d8baf4b8

Branch: refs/heads/trunk
Commit: d8baf4b80b4c98073bfd82d29ccf46b11847ee4e
Parents: b88292d
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Mon May 6 13:27:27 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Mon May 6 13:28:16 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../io/formats/multi/MultiEdgeInputFormat.java     |   37 +++++-
 .../io/formats/multi/MultiVertexInputFormat.java   |   37 +++++-
 .../giraph/io/internal/WrappedEdgeInputFormat.java |   48 +-------
 .../giraph/io/internal/WrappedEdgeReader.java      |   96 +++++++++++++++
 .../io/internal/WrappedVertexInputFormat.java      |   41 +------
 .../giraph/io/internal/WrappedVertexReader.java    |   93 ++++++++++++++
 7 files changed, 255 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 8a2b9eb..ea10fac 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-656: Input from multiple tables doesn't work with multithreading
+  (majakabiljo)
+
   GIRAPH-657: Remove unused reuseIncomingEdgeObjects option (apresta)
 
   GIRAPH-592: YourKit profiler (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index 113b3bc..c377fbc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.internal.WrappedEdgeReader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -61,11 +62,29 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
   public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException {
     if (inputSplit instanceof InputSplitWithInputFormatIndex) {
-      InputSplitWithInputFormatIndex split =
-          (InputSplitWithInputFormatIndex) inputSplit;
-      EdgeInputFormat<I, E> edgeInputFormat =
-          edgeInputFormats.get(split.getInputFormatIndex());
-      return edgeInputFormat.createEdgeReader(split.getSplit(), context);
+      // When multithreaded input is used we need to make sure other threads
+      // don't change context's configuration while we use it
+      synchronized (context) {
+        InputSplitWithInputFormatIndex split =
+            (InputSplitWithInputFormatIndex) inputSplit;
+        EdgeInputFormat<I, E> edgeInputFormat =
+            edgeInputFormats.get(split.getInputFormatIndex());
+        EdgeReader<I, E> edgeReader =
+            edgeInputFormat.createEdgeReader(split.getSplit(), context);
+        return new WrappedEdgeReader<I, E>(
+            edgeReader, edgeInputFormat.getConf()) {
+          @Override
+          public void initialize(InputSplit inputSplit,
+              TaskAttemptContext context) throws IOException,
+              InterruptedException {
+            // When multithreaded input is used we need to make sure other
+            // threads don't change context's configuration while we use it
+            synchronized (context) {
+              super.initialize(inputSplit, context);
+            }
+          }
+        };
+      }
     } else {
       throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
           " was not created by this class: " + inputSplit.getClass().getName());
@@ -75,8 +94,12 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
   @Override
   public List<InputSplit> getSplits(JobContext context,
       int minSplitCountHint) throws IOException, InterruptedException {
-    return
-        MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats);
+    // When multithreaded input is used we need to make sure other threads don't
+    // change context's configuration while we use it
+    synchronized (context) {
+      return MultiInputUtils.getSplits(
+          context, minSplitCountHint, edgeInputFormats);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index 631a451..72929d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.internal.WrappedVertexReader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -62,11 +63,29 @@ public class MultiVertexInputFormat<I extends WritableComparable,
   public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException {
     if (inputSplit instanceof InputSplitWithInputFormatIndex) {
-      InputSplitWithInputFormatIndex split =
-          (InputSplitWithInputFormatIndex) inputSplit;
-      VertexInputFormat<I, V, E> vertexInputFormat =
-          vertexInputFormats.get(split.getInputFormatIndex());
-      return vertexInputFormat.createVertexReader(split.getSplit(), context);
+      // When multithreaded input is used we need to make sure other threads
+      // don't change context's configuration while we use it
+      synchronized (context) {
+        InputSplitWithInputFormatIndex split =
+            (InputSplitWithInputFormatIndex) inputSplit;
+        VertexInputFormat<I, V, E> vertexInputFormat =
+            vertexInputFormats.get(split.getInputFormatIndex());
+        VertexReader<I, V, E> vertexReader =
+            vertexInputFormat.createVertexReader(split.getSplit(), context);
+        return new WrappedVertexReader<I, V, E>(
+            vertexReader, vertexInputFormat.getConf()) {
+          @Override
+          public void initialize(InputSplit inputSplit,
+              TaskAttemptContext context) throws IOException,
+              InterruptedException {
+            // When multithreaded input is used we need to make sure other
+            // threads don't change context's configuration while we use it
+            synchronized (context) {
+              super.initialize(inputSplit, context);
+            }
+          }
+        };
+      }
     } else {
       throw new IllegalStateException("createVertexReader: Got InputSplit " +
           "which was not created by this class: " +
@@ -77,8 +96,12 @@ public class MultiVertexInputFormat<I extends WritableComparable,
   @Override
   public List<InputSplit> getSplits(JobContext context,
       int minSplitCountHint) throws IOException, InterruptedException {
-    return MultiInputUtils.getSplits(context, minSplitCountHint,
-        vertexInputFormats);
+    // When multithreaded input is used we need to make sure other threads don't
+    // change context's configuration while we use it
+    synchronized (context) {
+      return MultiInputUtils.getSplits(
+          context, minSplitCountHint, vertexInputFormats);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 928b975..9c209dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.io.internal;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
 import org.apache.hadoop.io.Writable;
@@ -70,51 +68,9 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
   public EdgeReader<I, E> createEdgeReader(InputSplit split,
       TaskAttemptContext context) throws IOException {
     getConf().updateConfiguration(context.getConfiguration());
-    final EdgeReader<I, E> edgeReader =
+    EdgeReader<I, E> edgeReader =
         originalInputFormat.createEdgeReader(split, context);
-    return new EdgeReader<I, E>() {
-      @Override
-      public void setConf(
-          ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
-        WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf);
-        super.setConf(conf);
-        edgeReader.setConf(conf);
-      }
-
-      @Override
-      public void initialize(InputSplit inputSplit,
-          TaskAttemptContext context) throws IOException, InterruptedException {
-        WrappedEdgeInputFormat.this.getConf().updateConfiguration(
-            context.getConfiguration());
-        edgeReader.initialize(inputSplit, context);
-      }
-
-      @Override
-      public boolean nextEdge() throws IOException, InterruptedException {
-        return edgeReader.nextEdge();
-      }
-
-      @Override
-      public I getCurrentSourceId() throws IOException, InterruptedException {
-        return edgeReader.getCurrentSourceId();
-      }
-
-      @Override
-      public Edge<I, E> getCurrentEdge() throws IOException,
-          InterruptedException {
-        return edgeReader.getCurrentEdge();
-      }
-
-      @Override
-      public void close() throws IOException {
-        edgeReader.close();
-      }
-
-      @Override
-      public float getProgress() throws IOException, InterruptedException {
-        return edgeReader.getProgress();
-      }
-    };
+    return new WrappedEdgeReader<I, E>(edgeReader, getConf());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
new file mode 100644
index 0000000..c0a2cd1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link EdgeReader} to make sure proper configuration
+ * parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class WrappedEdgeReader<I extends WritableComparable,
+    E extends Writable> extends EdgeReader<I, E> {
+  /** EdgeReader to delegate the methods to */
+  private final EdgeReader<I, E> baseEdgeReader;
+
+  /**
+   * Constructor
+   *
+   * @param baseEdgeReader EdgeReader to delegate all the methods to
+   * @param conf Configuration
+   */
+  public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    this.baseEdgeReader = baseEdgeReader;
+    super.setConf(conf);
+    baseEdgeReader.setConf(conf);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+    // We don't want to use external configuration
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    baseEdgeReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public boolean nextEdge() throws IOException, InterruptedException {
+    return baseEdgeReader.nextEdge();
+  }
+
+  @Override
+  public I getCurrentSourceId() throws IOException, InterruptedException {
+    return baseEdgeReader.getCurrentSourceId();
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+    return baseEdgeReader.getCurrentEdge();
+  }
+
+  @Override
+  public void close() throws IOException {
+    baseEdgeReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return baseEdgeReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index ed606e3..f5379c1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.io.internal;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.hadoop.io.Writable;
@@ -74,44 +72,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
     getConf().updateConfiguration(context.getConfiguration());
     final VertexReader<I, V, E> vertexReader =
         originalInputFormat.createVertexReader(split, context);
-    return new VertexReader<I, V, E>() {
-      @Override
-      public void setConf(
-          ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
-        WrappedVertexInputFormat.this.getConf().updateConfiguration(conf);
-        super.setConf(conf);
-        vertexReader.setConf(conf);
-      }
-
-      @Override
-      public void initialize(InputSplit inputSplit,
-          TaskAttemptContext context) throws IOException, InterruptedException {
-        WrappedVertexInputFormat.this.getConf().updateConfiguration(
-            context.getConfiguration());
-        vertexReader.initialize(inputSplit, context);
-      }
-
-      @Override
-      public boolean nextVertex() throws IOException, InterruptedException {
-        return vertexReader.nextVertex();
-      }
-
-      @Override
-      public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
-          InterruptedException {
-        return vertexReader.getCurrentVertex();
-      }
-
-      @Override
-      public void close() throws IOException {
-        vertexReader.close();
-      }
-
-      @Override
-      public float getProgress() throws IOException, InterruptedException {
-        return vertexReader.getProgress();
-      }
-    };
+    return new WrappedVertexReader<I, V, E>(vertexReader, getConf());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
new file mode 100644
index 0000000..3a8ac50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link VertexReader} to make sure proper configuration
+ * parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class WrappedVertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
+  /** VertexReader to delegate the methods to */
+  private final VertexReader<I, V, E> baseVertexReader;
+
+  /**
+   * Constructor
+   *
+   * @param baseVertexReader VertexReader to delegate all the methods to
+   * @param conf Configuration
+   */
+  public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    this.baseVertexReader = baseVertexReader;
+    super.setConf(conf);
+    baseVertexReader.setConf(conf);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+    // We don't want to use external configuration
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    getConf().updateConfiguration(context.getConfiguration());
+    baseVertexReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public boolean nextVertex() throws IOException, InterruptedException {
+    return baseVertexReader.nextVertex();
+  }
+
+  @Override
+  public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+      InterruptedException {
+    return baseVertexReader.getCurrentVertex();
+  }
+
+  @Override
+  public void close() throws IOException {
+    baseVertexReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return baseVertexReader.getProgress();
+  }
+}