You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/02 23:11:29 UTC

[GitHub] [spark] mccheah commented on a change in pull request #25007: [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API

mccheah commented on a change in pull request #25007: [SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API 
URL: https://github.com/apache/spark/pull/25007#discussion_r310322735
 
 

 ##########
 File path: core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionWriter.java
 ##########
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+
+import org.apache.spark.annotation.Private;
+import org.apache.spark.shuffle.sort.io.DefaultWritableByteChannelWrapper;
+
+/**
+ * :: Private ::
+ * An interface for opening streams to persist partition bytes to a backing data store.
+ * <p>
+ * This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
+ * block.
+ *
+ * @since 3.0.0
+ */
+@Private
+public interface ShufflePartitionWriter {
+
+  /**
+   * Open and return an {@link OutputStream} that can write bytes to the underlying
+   * data store.
+   * <p>
+   * This method will only be called once on this partition writer in the map task, to write the
+   * bytes to the partition. The output stream will only be used to write the bytes for this
+   * partition. The map task closes this output stream upon writing all the bytes for this
+   * block, or if the write fails for any reason.
+   * <p>
+   * Implementations that intend on combining the bytes for all the partitions written by this
+   * map task should reuse the same OutputStream instance across all the partition writers provided
+   * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
+   * {@link OutputStream#close()} does not close the resource, since it will be reused across
+   * partition writes. The underlying resources should be cleaned up in
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   */
+  OutputStream openStream() throws IOException;
+
+  /**
+   * Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from
+   * input byte channels to the underlying shuffle data store.
+   * <p>
+   * This method will only be called once on this partition writer in the map task, to write the
+   * bytes to the partition. The channel will only be used to write the bytes for this
+   * partition. The map task closes this channel upon writing all the bytes for this
+   * block, or if the write fails for any reason.
+   * <p>
+   * Implementations that intend on combining the bytes for all the partitions written by this
+   * map task should reuse the same channel instance across all the partition writers provided
+   * by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
+   * {@link WritableByteChannelWrapper#close()} does not close the resource, since it
+   * will be reused across partition writes. The underlying resources should be cleaned up in
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()} and
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   * <p>
+   * This method is primarily for advanced optimizations where bytes can be copied from the input
+   * spill files to the output channel without copying data into memory.
+   * <p>
+   * The default implementation should be sufficient for most situations. Only override this
+   * method if there is a very specific optimization that needs to be built.
+   * <p>
+   * Note that the returned {@link WritableByteChannelWrapper} itself is closed, but not the
+   * underlying channel that is returned by {@link WritableByteChannelWrapper#channel()}. Ensure that
+   * the underlying channel is cleaned up in {@link WritableByteChannelWrapper#close()},
+   * {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
+   * {@link ShuffleMapOutputWriter#abort(Throwable)}.
+   */
+  default WritableByteChannelWrapper openChannelWrapper() throws IOException {
+    return new DefaultWritableByteChannelWrapper(Channels.newChannel(openStream()));
+  }
+
+  /**
+   * Returns the number of bytes written either by this writer's output stream opened by
+   * {@link #openStream()} or the byte channel opened by {@link #openChannelWrapper()}.
+   * <p>
+   * This can be different from the number of bytes given by the caller. For example, the
+   * stream might compress or encrypt the bytes before persisting the data to the backing
+   * data store.
+   */
+  long getNumBytesWritten();
 
 Review comment:
   But again, do we call `getNumBytesWritten` before or after calling `close` on this object? If before, does it include the bytes that might be padded in `close`-ing the stream? If after, are we going to be invoking methods on a closed resource, and is that reasonable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org