You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/06 22:30:46 UTC
git commit: updated refs/heads/trunk to d8baf4b
Updated Branches:
refs/heads/trunk b88292dbf -> d8baf4b80
GIRAPH-656: Input from multiple tables doesn't work with multithreading (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d8baf4b8
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d8baf4b8
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d8baf4b8
Branch: refs/heads/trunk
Commit: d8baf4b80b4c98073bfd82d29ccf46b11847ee4e
Parents: b88292d
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Mon May 6 13:27:27 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Mon May 6 13:28:16 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +
.../io/formats/multi/MultiEdgeInputFormat.java | 37 +++++-
.../io/formats/multi/MultiVertexInputFormat.java | 37 +++++-
.../giraph/io/internal/WrappedEdgeInputFormat.java | 48 +-------
.../giraph/io/internal/WrappedEdgeReader.java | 96 +++++++++++++++
.../io/internal/WrappedVertexInputFormat.java | 41 +------
.../giraph/io/internal/WrappedVertexReader.java | 93 ++++++++++++++
7 files changed, 255 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 8a2b9eb..ea10fac 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 1.0.1 - unreleased
+ GIRAPH-656: Input from multiple tables doesn't work with multithreading
+ (majakabiljo)
+
GIRAPH-657: Remove unused reuseIncomingEdgeObjects option (apresta)
GIRAPH-592: YourKit profiler (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index 113b3bc..c377fbc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.internal.WrappedEdgeReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -61,11 +62,29 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
if (inputSplit instanceof InputSplitWithInputFormatIndex) {
- InputSplitWithInputFormatIndex split =
- (InputSplitWithInputFormatIndex) inputSplit;
- EdgeInputFormat<I, E> edgeInputFormat =
- edgeInputFormats.get(split.getInputFormatIndex());
- return edgeInputFormat.createEdgeReader(split.getSplit(), context);
+ // When multithreaded input is used we need to make sure other threads
+ // don't change context's configuration while we use it
+ synchronized (context) {
+ InputSplitWithInputFormatIndex split =
+ (InputSplitWithInputFormatIndex) inputSplit;
+ EdgeInputFormat<I, E> edgeInputFormat =
+ edgeInputFormats.get(split.getInputFormatIndex());
+ EdgeReader<I, E> edgeReader =
+ edgeInputFormat.createEdgeReader(split.getSplit(), context);
+ return new WrappedEdgeReader<I, E>(
+ edgeReader, edgeInputFormat.getConf()) {
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ // When multithreaded input is used we need to make sure other
+ // threads don't change context's configuration while we use it
+ synchronized (context) {
+ super.initialize(inputSplit, context);
+ }
+ }
+ };
+ }
} else {
throw new IllegalStateException("createEdgeReader: Got InputSplit which" +
" was not created by this class: " + inputSplit.getClass().getName());
@@ -75,8 +94,12 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
@Override
public List<InputSplit> getSplits(JobContext context,
int minSplitCountHint) throws IOException, InterruptedException {
- return
- MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats);
+ // When multithreaded input is used we need to make sure other threads don't
+ // change context's configuration while we use it
+ synchronized (context) {
+ return MultiInputUtils.getSplits(
+ context, minSplitCountHint, edgeInputFormats);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index 631a451..72929d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.internal.WrappedVertexReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -62,11 +63,29 @@ public class MultiVertexInputFormat<I extends WritableComparable,
public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
if (inputSplit instanceof InputSplitWithInputFormatIndex) {
- InputSplitWithInputFormatIndex split =
- (InputSplitWithInputFormatIndex) inputSplit;
- VertexInputFormat<I, V, E> vertexInputFormat =
- vertexInputFormats.get(split.getInputFormatIndex());
- return vertexInputFormat.createVertexReader(split.getSplit(), context);
+ // When multithreaded input is used we need to make sure other threads
+ // don't change context's configuration while we use it
+ synchronized (context) {
+ InputSplitWithInputFormatIndex split =
+ (InputSplitWithInputFormatIndex) inputSplit;
+ VertexInputFormat<I, V, E> vertexInputFormat =
+ vertexInputFormats.get(split.getInputFormatIndex());
+ VertexReader<I, V, E> vertexReader =
+ vertexInputFormat.createVertexReader(split.getSplit(), context);
+ return new WrappedVertexReader<I, V, E>(
+ vertexReader, vertexInputFormat.getConf()) {
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ // When multithreaded input is used we need to make sure other
+ // threads don't change context's configuration while we use it
+ synchronized (context) {
+ super.initialize(inputSplit, context);
+ }
+ }
+ };
+ }
} else {
throw new IllegalStateException("createVertexReader: Got InputSplit " +
"which was not created by this class: " +
@@ -77,8 +96,12 @@ public class MultiVertexInputFormat<I extends WritableComparable,
@Override
public List<InputSplit> getSplits(JobContext context,
int minSplitCountHint) throws IOException, InterruptedException {
- return MultiInputUtils.getSplits(context, minSplitCountHint,
- vertexInputFormats);
+ // When multithreaded input is used we need to make sure other threads don't
+ // change context's configuration while we use it
+ synchronized (context) {
+ return MultiInputUtils.getSplits(
+ context, minSplitCountHint, vertexInputFormats);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
index 928b975..9c209dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java
@@ -18,8 +18,6 @@
package org.apache.giraph.io.internal;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.hadoop.io.Writable;
@@ -70,51 +68,9 @@ public class WrappedEdgeInputFormat<I extends WritableComparable,
public EdgeReader<I, E> createEdgeReader(InputSplit split,
TaskAttemptContext context) throws IOException {
getConf().updateConfiguration(context.getConfiguration());
- final EdgeReader<I, E> edgeReader =
+ EdgeReader<I, E> edgeReader =
originalInputFormat.createEdgeReader(split, context);
- return new EdgeReader<I, E>() {
- @Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
- WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf);
- super.setConf(conf);
- edgeReader.setConf(conf);
- }
-
- @Override
- public void initialize(InputSplit inputSplit,
- TaskAttemptContext context) throws IOException, InterruptedException {
- WrappedEdgeInputFormat.this.getConf().updateConfiguration(
- context.getConfiguration());
- edgeReader.initialize(inputSplit, context);
- }
-
- @Override
- public boolean nextEdge() throws IOException, InterruptedException {
- return edgeReader.nextEdge();
- }
-
- @Override
- public I getCurrentSourceId() throws IOException, InterruptedException {
- return edgeReader.getCurrentSourceId();
- }
-
- @Override
- public Edge<I, E> getCurrentEdge() throws IOException,
- InterruptedException {
- return edgeReader.getCurrentEdge();
- }
-
- @Override
- public void close() throws IOException {
- edgeReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return edgeReader.getProgress();
- }
- };
+ return new WrappedEdgeReader<I, E>(edgeReader, getConf());
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
new file mode 100644
index 0000000..c0a2cd1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link EdgeReader} to make sure proper configuration
+ * parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge data
+ */
+public class WrappedEdgeReader<I extends WritableComparable,
+ E extends Writable> extends EdgeReader<I, E> {
+ /** EdgeReader to delegate the methods to */
+ private final EdgeReader<I, E> baseEdgeReader;
+
+ /**
+ * Constructor
+ *
+ * @param baseEdgeReader EdgeReader to delegate all the methods to
+ * @param conf Configuration
+ */
+ public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ this.baseEdgeReader = baseEdgeReader;
+ super.setConf(conf);
+ baseEdgeReader.setConf(conf);
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+ // We don't want to use external configuration
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ getConf().updateConfiguration(context.getConfiguration());
+ baseEdgeReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ return baseEdgeReader.nextEdge();
+ }
+
+ @Override
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ return baseEdgeReader.getCurrentSourceId();
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+ return baseEdgeReader.getCurrentEdge();
+ }
+
+ @Override
+ public void close() throws IOException {
+ baseEdgeReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return baseEdgeReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
index ed606e3..f5379c1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java
@@ -18,8 +18,6 @@
package org.apache.giraph.io.internal;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.hadoop.io.Writable;
@@ -74,44 +72,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable,
getConf().updateConfiguration(context.getConfiguration());
final VertexReader<I, V, E> vertexReader =
originalInputFormat.createVertexReader(split, context);
- return new VertexReader<I, V, E>() {
- @Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
- WrappedVertexInputFormat.this.getConf().updateConfiguration(conf);
- super.setConf(conf);
- vertexReader.setConf(conf);
- }
-
- @Override
- public void initialize(InputSplit inputSplit,
- TaskAttemptContext context) throws IOException, InterruptedException {
- WrappedVertexInputFormat.this.getConf().updateConfiguration(
- context.getConfiguration());
- vertexReader.initialize(inputSplit, context);
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return vertexReader.nextVertex();
- }
-
- @Override
- public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
- InterruptedException {
- return vertexReader.getCurrentVertex();
- }
-
- @Override
- public void close() throws IOException {
- vertexReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return vertexReader.getProgress();
- }
- };
+ return new WrappedVertexReader<I, V, E>(vertexReader, getConf());
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
new file mode 100644
index 0000000..3a8ac50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link VertexReader} to make sure proper configuration
+ * parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ */
+public class WrappedVertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
+ /** VertexReader to delegate the methods to */
+ private final VertexReader<I, V, E> baseVertexReader;
+
+ /**
+ * Constructor
+ *
+ * @param baseVertexReader VertexReader to delegate all the methods to
+ * @param conf Configuration
+ */
+ public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ this.baseVertexReader = baseVertexReader;
+ super.setConf(conf);
+ baseVertexReader.setConf(conf);
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+ // We don't want to use external configuration
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ getConf().updateConfiguration(context.getConfiguration());
+ baseVertexReader.initialize(inputSplit, context);
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return baseVertexReader.nextVertex();
+ }
+
+ @Override
+ public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+ InterruptedException {
+ return baseVertexReader.getCurrentVertex();
+ }
+
+ @Override
+ public void close() throws IOException {
+ baseVertexReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return baseVertexReader.getProgress();
+ }
+}