You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2019/08/16 14:33:04 UTC

[giraph] branch trunk updated: JIRA-1222

This is an automated email from the ASF dual-hosted git repository.

maja pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5edbdb2  JIRA-1222
5edbdb2 is described below

commit 5edbdb28dcd8895d31b19313f9441791f6b71bc9
Author: Maja Kabiljo <ma...@fb.com>
AuthorDate: Fri Aug 16 07:29:50 2019 -0700

    JIRA-1222
    
    closes #104
---
 .../framework/output/BlockOutputDesc.java          | 12 +++++++
 .../framework/output/BlockOutputHandle.java        |  5 +++
 .../org/apache/giraph/io/EdgeOutputFormat.java     | 33 +----------------
 .../{EdgeOutputFormat.java => OutputFormat.java}   | 42 +++++++++++-----------
 .../org/apache/giraph/io/VertexOutputFormat.java   | 34 +-----------------
 .../io/internal/WrappedEdgeOutputFormat.java       | 10 ++++++
 .../io/internal/WrappedVertexOutputFormat.java     | 10 ++++++
 .../MultiThreadedSuperstepOutput.java              |  2 ++
 .../SynchronizedSuperstepOutput.java               |  9 +++--
 .../org/apache/giraph/worker/BspServiceWorker.java |  6 ++++
 10 files changed, 76 insertions(+), 87 deletions(-)

diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
index 6f2a3dd..5ac4d35 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
@@ -44,6 +44,18 @@ public interface BlockOutputDesc<OW extends BlockOutputWriter> {
   OW createOutputWriter(Configuration conf, Progressable hadoopProgressable);
 
   /**
+   * This method will be called before creating any writers
+   */
+  default void preWriting() {
+  }
+
+  /**
+   * This method will be called after all writers are closed
+   */
+  default void postWriting() {
+  }
+
+  /**
    * Commit everything
    */
   void commit();
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
index 76fd768..a16963d 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java
@@ -52,6 +52,7 @@ public class BlockOutputHandle implements BlockOutputApi {
     outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap(
         conf, jobIdentifier);
     for (String confOption : outputDescMap.keySet()) {
+      outputDescMap.get(confOption).preWriting();
       freeWriters.put(confOption,
           new ConcurrentLinkedQueue<BlockOutputWriter>());
       occupiedWriters.put(confOption,
@@ -127,5 +128,9 @@ public class BlockOutputHandle implements BlockOutputApi {
     ProgressableUtils.getResultsWithNCallables(callableFactory,
         Math.min(GiraphConstants.NUM_OUTPUT_THREADS.get(conf),
             allWriters.size()), "close-writers-%d", progressable);
+    // Close all output formats
+    for (BlockOutputDesc outputDesc : outputDescMap.values()) {
+      outputDesc.postWriting();
+    }
   }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
index ac4c6ce..0749399 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
@@ -20,11 +20,8 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -38,7 +35,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 public abstract class EdgeOutputFormat<
     I extends WritableComparable, V extends Writable,
     E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+    OutputFormat<I, V, E> {
   /**
    * Create an edge writer for a given split. The framework will call
    * {@link EdgeWriter#initialize(TaskAttemptContext)} before
@@ -51,32 +48,4 @@ public abstract class EdgeOutputFormat<
    */
   public abstract EdgeWriter<I, V, E> createEdgeWriter(
     TaskAttemptContext context) throws IOException, InterruptedException;
-
-  /**
-   * Check for validity of the output-specification for the job.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * <p>This is to validate the output specification for the job when it is
-   * a job is submitted.  Typically checks that it does not already exist,
-   * throwing an exception when it already exists, so that output is not
-   * overwritten.</p>
-   *
-   * @param  context information about the job
-   * @throws IOException when output should not be attempted
-   */
-  public abstract void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException;
-
-  /**
-   * Get the output committer for this output format. This is responsible
-   * for ensuring the output is committed correctly.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * @param context the task context
-   * @return an output committer
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract OutputCommitter getOutputCommitter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
similarity index 77%
copy from giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
copy to giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
index ac4c6ce..59ad7be 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/OutputFormat.java
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.io;
 
-import java.io.IOException;
-
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -27,32 +25,20 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.IOException;
+
 /**
- * abstract class which can only write edges
+ * Parent class for vertex and edge output formats
  *
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
  */
-@SuppressWarnings("rawtypes")
-public abstract class EdgeOutputFormat<
+public abstract class OutputFormat<
     I extends WritableComparable, V extends Writable,
     E extends Writable> extends
     DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /**
-   * Create an edge writer for a given split. The framework will call
-   * {@link EdgeWriter#initialize(TaskAttemptContext)} before
-   * the split is used.
-   *
-   * @param context the information about the task
-   * @return a new vertex writer
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract EdgeWriter<I, V, E> createEdgeWriter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
-
-  /**
    * Check for validity of the output-specification for the job.
    * (Copied from Hadoop OutputFormat)
    *
@@ -65,7 +51,7 @@ public abstract class EdgeOutputFormat<
    * @throws IOException when output should not be attempted
    */
   public abstract void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException;
+      throws IOException, InterruptedException;
 
   /**
    * Get the output committer for this output format. This is responsible
@@ -78,5 +64,21 @@ public abstract class EdgeOutputFormat<
    * @throws InterruptedException
    */
   public abstract OutputCommitter getOutputCommitter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
+      TaskAttemptContext context) throws IOException, InterruptedException;
+
+  /**
+   * This method will be called before creating any writers
+   *
+   * @param context the task context
+   */
+  public void preWriting(TaskAttemptContext context) {
+  }
+
+  /**
+   * This method will be called after all writers are closed
+   *
+   * @param context the task context
+   */
+  public void postWriting(TaskAttemptContext context) {
+  }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
index ad00a8e..a571ee7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -20,10 +20,6 @@ package org.apache.giraph.io;
 
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,7 +45,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 public abstract class VertexOutputFormat<
     I extends WritableComparable, V extends Writable,
     E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+    OutputFormat<I, V, E> {
   /**
    * Create a vertex writer for a given split. The framework will call
    * {@link VertexWriter#initialize(TaskAttemptContext)} before
@@ -62,32 +58,4 @@ public abstract class VertexOutputFormat<
    */
   public abstract VertexWriter<I, V, E> createVertexWriter(
     TaskAttemptContext context) throws IOException, InterruptedException;
-
-  /**
-   * Check for validity of the output-specification for the job.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * <p>This is to validate the output specification for the job when it is
-   * a job is submitted.  Typically checks that it does not already exist,
-   * throwing an exception when it already exists, so that output is not
-   * overwritten.</p>
-   *
-   * @param context information about the job
-   * @throws IOException when output should not be attempted
-   */
-  public abstract void checkOutputSpecs(JobContext context)
-    throws IOException, InterruptedException;
-
-  /**
-   * Get the output committer for this output format. This is responsible
-   * for ensuring the output is committed correctly.
-   * (Copied from Hadoop OutputFormat)
-   *
-   * @param context the task context
-   * @return an output committer
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public abstract OutputCommitter getOutputCommitter(
-    TaskAttemptContext context) throws IOException, InterruptedException;
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
index 988e556..40891f2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeOutputFormat.java
@@ -164,4 +164,14 @@ public class WrappedEdgeOutputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void preWriting(TaskAttemptContext context) {
+    originalOutputFormat.preWriting(context);
+  }
+
+  @Override
+  public void postWriting(TaskAttemptContext context) {
+    originalOutputFormat.postWriting(context);
+  }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
index 06771c4..6350b4e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
@@ -161,4 +161,14 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
       }
     };
   }
+
+  @Override
+  public void preWriting(TaskAttemptContext context) {
+    originalOutputFormat.preWriting(context);
+  }
+
+  @Override
+  public void postWriting(TaskAttemptContext context) {
+    originalOutputFormat.postWriting(context);
+  }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index b54ee64..92e3853 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -75,6 +75,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
     this.context = context;
     availableVertexWriters = Lists.newArrayList();
     occupiedVertexWriters = Sets.newHashSet();
+    vertexOutputFormat.preWriting(context);
   }
 
   @Override
@@ -145,5 +146,6 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
     ProgressableUtils.getResultsWithNCallables(callableFactory,
         Math.min(configuration.getNumOutputThreads(),
             availableVertexWriters.size()), "close-writers-%d", context);
+    vertexOutputFormat.postWriting(context);
   }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
index 7f233e0..6c7049b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.SimpleVertexWriter;
 import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -43,6 +44,8 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Main vertex writer */
   private final VertexWriter<I, V, E> vertexWriter;
+  /** Vertex output format */
+  private final WrappedVertexOutputFormat<I, V, E> vertexOutputFormat;
   /**
    * Simple vertex writer, wrapper for {@link #vertexWriter}.
    * Call to writeVertex is thread-safe.
@@ -61,8 +64,9 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
       Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
     try {
-      vertexWriter =
-          conf.createWrappedVertexOutputFormat().createVertexWriter(context);
+      vertexOutputFormat = conf.createWrappedVertexOutputFormat();
+      vertexOutputFormat.preWriting(context);
+      vertexWriter = vertexOutputFormat.createVertexWriter(context);
       vertexWriter.setConf(conf);
       vertexWriter.initialize(context);
     } catch (IOException e) {
@@ -93,5 +97,6 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
   @Override
   public void postApplication() throws IOException, InterruptedException {
     vertexWriter.close(context);
+    vertexOutputFormat.postWriting(context);
   }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 794e1a0..8d024b1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -976,6 +976,7 @@ else[HADOOP_NON_SECURE]*/
             "using " + numThreads + " threads");
     final VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createWrappedVertexOutputFormat();
+    vertexOutputFormat.preWriting(getContext());
 
     getPartitionStore().startIteration();
 
@@ -1050,6 +1051,8 @@ else[HADOOP_NON_SECURE]*/
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
         "save-vertices-%d", getContext());
 
+    vertexOutputFormat.postWriting(getContext());
+
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
       "saveVertices: Done saving vertices.");
     // YARN: must complete the commit the "task" output, Hadoop isn't there.
@@ -1100,6 +1103,7 @@ else[HADOOP_NON_SECURE]*/
         numThreads + " threads");
     final EdgeOutputFormat<I, V, E> edgeOutputFormat =
         conf.createWrappedEdgeOutputFormat();
+    edgeOutputFormat.preWriting(getContext());
 
     getPartitionStore().startIteration();
 
@@ -1159,6 +1163,8 @@ else[HADOOP_NON_SECURE]*/
     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
         "save-vertices-%d", getContext());
 
+    edgeOutputFormat.postWriting(getContext());
+
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
       "saveEdges: Done saving edges.");
     // YARN: must complete the commit the "task" output, Hadoop isn't there.