You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/13 03:07:10 UTC

[incubator-uniffle] branch master updated: [#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fb394e5e [#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)
fb394e5e is described below

commit fb394e5ee8b40c8ee176eaf8fc2e000e072a247f
Author: Qing <11...@qq.com>
AuthorDate: Tue Jun 13 11:07:05 2023 +0800

    [#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)
    
    ### What changes were proposed in this pull request?
    Add Simple Fetched Allocator to allcation memory or disk for shuffle data
    
    ### Why are the changes needed?
    Fix: https://github.com/apache/incubator-uniffle/issues/854
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    unit test case
---
 .../impl/RssSimpleFetchedInputAllocator.java       | 201 +++++++++++++++++++++
 .../impl/RssSimpleFetchedInputAllocatorTest.java   |  87 +++++++++
 2 files changed, 288 insertions(+)

diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java
new file mode 100644
index 00000000..a4298eb1
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
+public class RssSimpleFetchedInputAllocator extends SimpleFetchedInputAllocator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssSimpleFetchedInputAllocator.class);
+  
+  private final Configuration conf;
+
+  private final TezTaskOutputFiles fileNameAllocator;
+  private final LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
+
+  private final long maxAvailableTaskMemory;
+  private final long initialMemoryAvailable;
+
+  private final String srcNameTrimmed;
+  
+  private volatile long usedMemory = 0;
+
+  public RssSimpleFetchedInputAllocator(String srcNameTrimmed,
+                                     String uniqueIdentifier, int dagID,
+                                     Configuration conf,
+                                     long maxTaskAvailableMemory,
+                                     long memoryAvailable) {
+    super(srcNameTrimmed, uniqueIdentifier, dagID, conf, maxTaskAvailableMemory, memoryAvailable);
+    this.srcNameTrimmed = srcNameTrimmed;
+    this.conf = conf;    
+    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+    this.initialMemoryAvailable = memoryAvailable;
+    
+    this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier, dagID);
+    this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new RssException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " + maxInMemCopyUse);
+    }
+    
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+    if (memReq <= this.initialMemoryAvailable) {
+      this.memoryLimit = memReq;
+    } else {
+      this.memoryLimit = initialMemoryAvailable;
+    }
+
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new RssException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": " + singleShuffleMemoryLimitPercent);
+    }
+    this.maxSingleShuffleLimit = (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
+
+    LOG.info(srcNameTrimmed + ": "
+        + "RequestedMemory=" + memReq
+        + ", AssignedMemory=" + this.memoryLimit
+        + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit
+    );
+
+  }
+
+  @Private
+  public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse = conf.getFloat(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new RssException("Invalid value for "
+          + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " + maxInMemCopyUse);
+    }
+    return (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long actualSize, long compressedSize,
+            InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (actualSize > maxSingleShuffleLimit
+        || this.usedMemory + actualSize > this.memoryLimit) {
+      LOG.info("Allocate DiskFetchedInput, length:{}", actualSize);
+      return new DiskFetchedInput(actualSize + 8, inputAttemptIdentifier, this, conf,
+          localDirAllocator, fileNameAllocator);
+    } else {
+      this.usedMemory += actualSize;
+      if (LOG.isDebugEnabled()) {
+        LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " + usedMemory);
+      }
+      return new MemoryFetchedInput(actualSize, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public synchronized FetchedInput allocateType(Type type, long actualSize,
+      long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
+      throws IOException {
+
+    switch (type) {
+      case DISK:
+        LOG.info("AllocateType DiskFetchedInput, compressedSize:{}", compressedSize);
+        return new DiskFetchedInput(compressedSize + 8, inputAttemptIdentifier, this,
+            conf, localDirAllocator, fileNameAllocator);
+      default:
+        return allocate(actualSize, compressedSize, inputAttemptIdentifier);
+    }
+  }
+
+  @Override
+  public synchronized void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+      // Not tracking anything here.
+      case DISK:
+      case DISK_DIRECT:
+      case MEMORY:
+        break;
+      default:
+        throw new RssException("InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public synchronized void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public synchronized void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+      case DISK:
+        break;
+      case MEMORY:
+        unreserve(((MemoryFetchedInput) fetchedInput).getSize());
+        break;
+      default:
+        throw new RssException("InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(srcNameTrimmed + ": " + "Used memory after freeing " + size  + " : " + usedMemory);
+    }
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java
new file mode 100644
index 00000000..b5bfaf2c
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssSimpleFetchedInputAllocatorTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssSimpleFetchedInputAllocatorTest.class);
+
+  @Test
+  public void testAllocate() throws IOException {
+    Configuration conf = new Configuration();
+
+    long jvmMax = 954728448L;
+    LOG.info("jvmMax: " + jvmMax);
+
+    float bufferPercent = 0.1f;
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, bufferPercent);
+    conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+
+    long inMemThreshold = (long) (bufferPercent * jvmMax);
+    LOG.info("InMemThreshold: " + inMemThreshold);
+
+    RssSimpleFetchedInputAllocator inputManager = new RssSimpleFetchedInputAllocator(
+        "srcName", UUID.randomUUID().toString(), 123, conf,
+        Runtime.getRuntime().maxMemory(), inMemThreshold);
+
+    long requestSize = (long) (0.4f * inMemThreshold);
+    long compressedSize = 1L;
+    LOG.info("RequestSize: " + requestSize);
+
+    FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+
+    FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+
+    MockedConstruction<DiskFetchedInput> mockedConstruction =
+          Mockito.mockConstruction(DiskFetchedInput.class, ((mockedInput, context) -> {
+            // Over limit by this point. Next reserve should give back a DISK allocation
+            FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
+            assertEquals(FetchedInput.Type.DISK, fi3.getType());
+
+            // Freed one memory allocation. Next should be mem again.
+            fi1.abort();
+            fi1.free();
+            FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+            assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+
+            // Freed one disk allocation. Next sould be disk again (no mem freed)
+            fi3.abort();
+            fi3.free();
+            FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+            assertEquals(FetchedInput.Type.DISK, fi5.getType());
+          }));
+  }
+}