You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2019/08/16 14:33:04 UTC
[giraph] branch trunk updated: JIRA-1222
This is an automated email from the ASF dual-hosted git repository.
maja pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5edbdb2 JIRA-1222
5edbdb2 is described below
commit 5edbdb28dcd8895d31b19313f9441791f6b71bc9
Author: Maja Kabiljo <ma...@fb.com>
AuthorDate: Fri Aug 16 07:29:50 2019 -0700
JIRA-1222
closes #104
---
.../framework/output/BlockOutputDesc.java | 12 +++++++
.../framework/output/BlockOutputHandle.java | 5 +++
.../org/apache/giraph/io/EdgeOutputFormat.java | 33 +----------------
.../{EdgeOutputFormat.java => OutputFormat.java} | 42 +++++++++++-----------
.../org/apache/giraph/io/VertexOutputFormat.java | 34 +-----------------
.../io/internal/WrappedEdgeOutputFormat.java | 10 ++++++
.../io/internal/WrappedVertexOutputFormat.java | 10 ++++++
.../MultiThreadedSuperstepOutput.java | 2 ++
.../SynchronizedSuperstepOutput.java | 9 +++--
.../org/apache/giraph/worker/BspServiceWorker.java | 6 ++++
10 files changed, 76 insertions(+), 87 deletions(-)
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
index 6f2a3dd..5ac4d35 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
@@ -44,6 +44,18 @@ public interface BlockOutputDesc<OW extends BlockOutputWriter> {
OW createOutputWriter(Configuration conf, Progressable hadoopProgressable);
/**
+ * This method will be called before creating any writers
+ */
+ default void preWriting() {
+ }
+
+ /**
+ * This method will be called after all writers are closed
+ */
+ default void postWriting() {
+ }
+
+ /**
* Commit everything
*/
void commit();
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
index 76fd768..a16963d 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
@@ -52,6 +52,7 @@ public class BlockOutputHandle implements BlockOutputApi {
outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
conf, jobIdentifier);
for (String confOption : outputDescMap.keySet()) {
+ outputDescMap.get(confOption).preWriting();
freeWriters.put(confOption,
new ConcurrentLinkedQueue<BlockOutputWriter>());
occupiedWriters.put(confOption,
@@ -127,5 +128,9 @@ public class BlockOutputHandle implements BlockOutputApi {
ProgressableUtils.getResultsWithNCallables(callableFactory,
Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
allWriters.size()), "close-writers-%d", progressable);
+ // Close all output formats
+ for (BlockOutputDesc outputDesc : outputDescMap.values()) {
+ outputDesc.postWriting();
+ }
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
index ac4c6ce..0749399 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
@@ -20,11 +20,8 @@ package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
@@ -38,7 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public abstract class EdgeOutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+ OutputFormat<I, V, E> {
/**
* Create an edge writer for a given split. The framework will call
* {@link EdgeWriter#initialize(TaskAttemptContext)} before
@@ -51,32 +48,4 @@ public abstract class EdgeOutputFormat<
*/
public abstract EdgeWriter<I, V, E> createEdgeWriter(
TaskAttemptContext context) throws IOException, InterruptedException;
-
- /**
- * Check for validity of the output-specification for the job.
- * (Copied from Hadoop OutputFormat)
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- *
- * @param context information about the job
- * @throws IOException when output should not be attempted
- */
- public abstract void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException;
-
- /**
- * Get the output committer for this output format. This is responsible
- * for ensuring the output is committed correctly.
- * (Copied from Hadoop OutputFormat)
- *
- * @param context the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract OutputCommitter getOutputCommitter(
- TaskAttemptContext context) throws IOException, InterruptedException;
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
similarity index 77%
copy from giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
copy to giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
index ac4c6ce..59ad7be 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
@@ -18,8 +18,6 @@
package org.apache.giraph.io;
-import java.io.IOException;
-
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -27,32 +25,20 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+
/**
- * abstract class which can only write edges
+ * Parent class for vertex and edge output formats
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
*/
-@SuppressWarnings("rawtypes")
-public abstract class EdgeOutputFormat<
+public abstract class OutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
DefaultImmutableClassesGiraphConfigurable<I, V, E> {
/**
- * Create an edge writer for a given split. The framework will call
- * {@link EdgeWriter#initialize(TaskAttemptContext)} before
- * the split is used.
- *
- * @param context the information about the task
- * @return a new vertex writer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract EdgeWriter<I, V, E> createEdgeWriter(
- TaskAttemptContext context) throws IOException, InterruptedException;
-
- /**
* Check for validity of the output-specification for the job.
* (Copied from Hadoop OutputFormat)
*
@@ -65,7 +51,7 @@ public abstract class EdgeOutputFormat<
* @throws IOException when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException;
+ throws IOException, InterruptedException;
/**
* Get the output committer for this output format. This is responsible
@@ -78,5 +64,21 @@ public abstract class EdgeOutputFormat<
* @throws InterruptedException
*/
public abstract OutputCommitter getOutputCommitter(
- TaskAttemptContext context) throws IOException, InterruptedException;
+ TaskAttemptContext context) throws IOException, InterruptedException;
+
+ /**
+ * This method will be called before creating any writers
+ *
+ * @param context the task context
+ */
+ public void preWriting(TaskAttemptContext context) {
+ }
+
+ /**
+ * This method will be called after all writers are closed
+ *
+ * @param context the task context
+ */
+ public void postWriting(TaskAttemptContext context) {
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
index ad00a8e..a571ee7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -20,10 +20,6 @@ package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,7 +45,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
public abstract class VertexOutputFormat<
I extends WritableComparable, V extends Writable,
E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+ OutputFormat<I, V, E> {
/**
* Create a vertex writer for a given split. The framework will call
* {@link VertexWriter#initialize(TaskAttemptContext)} before
@@ -62,32 +58,4 @@ public abstract class VertexOutputFormat<
*/
public abstract VertexWriter<I, V, E> createVertexWriter(
TaskAttemptContext context) throws IOException, InterruptedException;
-
- /**
- * Check for validity of the output-specification for the job.
- * (Copied from Hadoop OutputFormat)
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- *
- * @param context information about the job
- * @throws IOException when output should not be attempted
- */
- public abstract void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException;
-
- /**
- * Get the output committer for this output format. This is responsible
- * for ensuring the output is committed correctly.
- * (Copied from Hadoop OutputFormat)
- *
- * @param context the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract OutputCommitter getOutputCommitter(
- TaskAttemptContext context) throws IOException, InterruptedException;
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
index 988e556..40891f2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
@@ -164,4 +164,14 @@ public class WrappedEdgeOutputFormat<I extends WritableComparable,
}
};
}
+
+ @Override
+ public void preWriting(TaskAttemptContext context) {
+ originalOutputFormat.preWriting(context);
+ }
+
+ @Override
+ public void postWriting(TaskAttemptContext context) {
+ originalOutputFormat.postWriting(context);
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
index 06771c4..6350b4e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
@@ -161,4 +161,14 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
}
};
}
+
+ @Override
+ public void preWriting(TaskAttemptContext context) {
+ originalOutputFormat.preWriting(context);
+ }
+
+ @Override
+ public void postWriting(TaskAttemptContext context) {
+ originalOutputFormat.postWriting(context);
+ }
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index b54ee64..92e3853 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -75,6 +75,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
this.context = context;
availableVertexWriters = Lists.newArrayList();
occupiedVertexWriters = Sets.newHashSet();
+ vertexOutputFormat.preWriting(context);
}
@Override
@@ -145,5 +146,6 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
ProgressableUtils.getResultsWithNCallables(callableFactory,
Math.min(configuration.getNumOutputThreads(),
availableVertexWriters.size()), "close-writers-%d", context);
+ vertexOutputFormat.postWriting(context);
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
index 7f233e0..6c7049b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -43,6 +44,8 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
private final Mapper<?, ?, ?, ?>.Context context;
/** Main vertex writer */
private final VertexWriter<I, V, E> vertexWriter;
+ /** Vertex output format */
+ private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
/**
* Simple vertex writer, wrapper for {@link #vertexWriter}.
* Call to writeVertex is thread-safe.
@@ -61,8 +64,9 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
try {
- vertexWriter =
- conf.createWrappedVertexOutputFormat().createVertexWriter(context);
+ vertexOutputFormat = conf.createWrappedVertexOutputFormat();
+ vertexOutputFormat.preWriting(context);
+ vertexWriter = vertexOutputFormat.createVertexWriter(context);
vertexWriter.setConf(conf);
vertexWriter.initialize(context);
} catch (IOException e) {
@@ -93,5 +97,6 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
@Override
public void postApplication() throws IOException, InterruptedException {
vertexWriter.close(context);
+ vertexOutputFormat.postWriting(context);
}
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 794e1a0..8d024b1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -976,6 +976,7 @@ else[HADOOP_NON_SECURE]*/
"using " + numThreads + " threads");
final VertexOutputFormat<I, V, E> vertexOutputFormat =
getConfiguration().createWrappedVertexOutputFormat();
+ vertexOutputFormat.preWriting(getContext());
getPartitionStore().startIteration();
@@ -1050,6 +1051,8 @@ else[HADOOP_NON_SECURE]*/
ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
"save-vertices-%d", getContext());
+ vertexOutputFormat.postWriting(getContext());
+
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveVertices: Done saving vertices.");
// YARN: must complete the commit the "task" output, Hadoop isn't there.
@@ -1100,6 +1103,7 @@ else[HADOOP_NON_SECURE]*/
numThreads + " threads");
final EdgeOutputFormat<I, V, E> edgeOutputFormat =
conf.createWrappedEdgeOutputFormat();
+ edgeOutputFormat.preWriting(getContext());
getPartitionStore().startIteration();
@@ -1159,6 +1163,8 @@ else[HADOOP_NON_SECURE]*/
ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
"save-vertices-%d", getContext());
+ edgeOutputFormat.postWriting(getContext());
+
LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
"saveEdges: Done saving edges.");
// YARN: must complete the commit the "task" output, Hadoop isn't there.