You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/08/04 21:54:50 UTC

spark git commit: [SPARK-15074][SHUFFLE] Cache shuffle index file to speedup shuffle fetch

Repository: spark
Updated Branches:
  refs/heads/master 0e2e5d7d0 -> 9c15d079d


[SPARK-15074][SHUFFLE] Cache shuffle index file to speedup shuffle fetch

## What changes were proposed in this pull request?

Shuffle fetch on large intermediate dataset is slow because the shuffle service open/close the index file for each shuffle fetch. This change introduces a cache for the index information so that we can avoid accessing the index files for each block fetch

## How was this patch tested?

Tested by running a job on the cluster and the shuffle read time was reduced by 50%.

Author: Sital Kedia <sk...@fb.com>

Closes #12944 from sitalkedia/shuffle_service.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c15d079
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c15d079
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c15d079

Branch: refs/heads/master
Commit: 9c15d079df2418a1412269a702f3a7861daee61c
Parents: 0e2e5d7
Author: Sital Kedia <sk...@fb.com>
Authored: Thu Aug 4 14:54:38 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Aug 4 14:54:38 2016 -0700

----------------------------------------------------------------------
 .../spark/network/util/TransportConf.java       |  4 ++
 .../shuffle/ExternalShuffleBlockResolver.java   | 36 +++++++----
 .../shuffle/ShuffleIndexInformation.java        | 63 ++++++++++++++++++++
 .../network/shuffle/ShuffleIndexRecord.java     | 40 +++++++++++++
 docs/configuration.md                           |  7 +++
 5 files changed, 138 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c15d079/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 9f030da..0efc400 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -60,6 +60,10 @@ public class TransportConf {
     SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
   }
 
+  public int getInt(String name, int defaultValue) {
+    return conf.getInt(name, defaultValue);
+  }
+
   private String getConfKey(String suffix) {
     return "spark." + module + "." + suffix;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c15d079/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 7eefcca..56cf1e2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
@@ -29,6 +30,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import org.fusesource.leveldbjni.JniDBFactory;
 import org.fusesource.leveldbjni.internal.NativeDB;
@@ -66,6 +70,12 @@ public class ExternalShuffleBlockResolver {
   @VisibleForTesting
   final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
 
+  /**
+   *  Caches index file information so that we can avoid open/close the index files
+   *  for each block fetch.
+   */
+  private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
+
   // Single-threaded Java executor used to perform expensive recursive directory deletion.
   private final Executor directoryCleaner;
 
@@ -95,6 +105,15 @@ public class ExternalShuffleBlockResolver {
       Executor directoryCleaner) throws IOException {
     this.conf = conf;
     this.registeredExecutorFile = registeredExecutorFile;
+    int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
+    CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
+        new CacheLoader<File, ShuffleIndexInformation>() {
+          public ShuffleIndexInformation load(File file) throws IOException {
+            return new ShuffleIndexInformation(file);
+          }
+        };
+    shuffleIndexCache = CacheBuilder.newBuilder()
+                                    .maximumSize(indexCacheEntries).build(indexCacheLoader);
     if (registeredExecutorFile != null) {
       Options options = new Options();
       options.createIfMissing(false);
@@ -265,24 +284,17 @@ public class ExternalShuffleBlockResolver {
     File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
       "shuffle_" + shuffleId + "_" + mapId + "_0.index");
 
-    DataInputStream in = null;
     try {
-      in = new DataInputStream(new FileInputStream(indexFile));
-      in.skipBytes(reduceId * 8);
-      long offset = in.readLong();
-      long nextOffset = in.readLong();
+      ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
+      ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
       return new FileSegmentManagedBuffer(
         conf,
         getFile(executor.localDirs, executor.subDirsPerLocalDir,
           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
-        offset,
-        nextOffset - offset);
-    } catch (IOException e) {
+        shuffleIndexRecord.getOffset(),
+        shuffleIndexRecord.getLength());
+    } catch (ExecutionException e) {
       throw new RuntimeException("Failed to open file: " + indexFile, e);
-    } finally {
-      if (in != null) {
-        JavaUtils.closeQuietly(in);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9c15d079/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
new file mode 100644
index 0000000..f1ff44a
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.nio.ch.IOUtil;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+
+/**
+ * Keeps the index information for a particular map output
+ * as an in-memory LongBuffer.
+ */
+public class ShuffleIndexInformation {
+  /** offsets as long buffer */
+  private final LongBuffer offsets;
+
+  public ShuffleIndexInformation(File indexFile) throws IOException {
+    int size = (int)indexFile.length();
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    offsets = buffer.asLongBuffer();
+    DataInputStream dis = null;
+    try {
+      dis = new DataInputStream(new FileInputStream(indexFile));
+      dis.readFully(buffer.array());
+    } finally {
+      if (dis != null) {
+        dis.close();
+      }
+    }
+  }
+
+  /**
+   * Get index offset for a particular reducer.
+   */
+  public ShuffleIndexRecord getIndex(int reduceId) {
+    long offset = offsets.get(reduceId);
+    long nextOffset = offsets.get(reduceId + 1);
+    return new ShuffleIndexRecord(offset, nextOffset - offset);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c15d079/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java
new file mode 100644
index 0000000..6a4fac1
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+/**
+ * Contains offset and length of the shuffle block data.
+ */
+public class ShuffleIndexRecord {
+  private final long offset;
+  private final long length;
+
+  public ShuffleIndexRecord(long offset, long length) {
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9c15d079/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index bf10b24..cc6b2b6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -522,6 +522,13 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.shuffle.service.index.cache.entries</code></td>
+  <td>1024</td>
+  <td>
+    Max number of entries to keep in the index cache of the shuffle service.
+  </td>
+</tr>
+<tr>
   <td><code>spark.shuffle.sort.bypassMergeThreshold</code></td>
   <td>200</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org