You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/13 03:07:10 UTC
[incubator-uniffle] branch master updated: [#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fb394e5e [#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)
fb394e5e is described below
commit fb394e5ee8b40c8ee176eaf8fc2e000e072a247f
Author: Qing <11...@qq.com>
AuthorDate: Tue Jun 13 11:07:05 2023 +0800
[#854][FOLLOWUP] feat(tez): Add Simple Fetched Allocator to allocation memory or disk for shuffle data (#922)
### What changes were proposed in this pull request?
Add Simple Fetched Allocator to allcation memory or disk for shuffle data
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/854
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test case
---
.../impl/RssSimpleFetchedInputAllocator.java | 201 +++++++++++++++++++++
.../impl/RssSimpleFetchedInputAllocatorTest.java | 87 +++++++++
2 files changed, 288 insertions(+)
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java
new file mode 100644
index 00000000..a4298eb1
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocator.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
+import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
+public class RssSimpleFetchedInputAllocator extends SimpleFetchedInputAllocator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RssSimpleFetchedInputAllocator.class);
+
+ private final Configuration conf;
+
+ private final TezTaskOutputFiles fileNameAllocator;
+ private final LocalDirAllocator localDirAllocator;
+
+ // Configuration parameters
+ private final long memoryLimit;
+ private final long maxSingleShuffleLimit;
+
+ private final long maxAvailableTaskMemory;
+ private final long initialMemoryAvailable;
+
+ private final String srcNameTrimmed;
+
+ private volatile long usedMemory = 0;
+
+ public RssSimpleFetchedInputAllocator(String srcNameTrimmed,
+ String uniqueIdentifier, int dagID,
+ Configuration conf,
+ long maxTaskAvailableMemory,
+ long memoryAvailable) {
+ super(srcNameTrimmed, uniqueIdentifier, dagID, conf, maxTaskAvailableMemory, memoryAvailable);
+ this.srcNameTrimmed = srcNameTrimmed;
+ this.conf = conf;
+ this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+ this.initialMemoryAvailable = memoryAvailable;
+
+ this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier, dagID);
+ this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+
+ // Setup configuration
+ final float maxInMemCopyUse = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new RssException("Invalid value for "
+ + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " + maxInMemCopyUse);
+ }
+
+ long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+ Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+ if (memReq <= this.initialMemoryAvailable) {
+ this.memoryLimit = memReq;
+ } else {
+ this.memoryLimit = initialMemoryAvailable;
+ }
+
+ final float singleShuffleMemoryLimitPercent = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
+ if (singleShuffleMemoryLimitPercent <= 0.0f
+ || singleShuffleMemoryLimitPercent > 1.0f) {
+ throw new RssException("Invalid value for "
+ + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": " + singleShuffleMemoryLimitPercent);
+ }
+ this.maxSingleShuffleLimit = (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
+
+ LOG.info(srcNameTrimmed + ": "
+ + "RequestedMemory=" + memReq
+ + ", AssignedMemory=" + this.memoryLimit
+ + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit
+ );
+
+ }
+
+ @Private
+ public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+ final float maxInMemCopyUse = conf.getFloat(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new RssException("Invalid value for "
+ + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT + ": " + maxInMemCopyUse);
+ }
+ return (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+ Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+ }
+
+ @Override
+ public synchronized FetchedInput allocate(long actualSize, long compressedSize,
+ InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+ if (actualSize > maxSingleShuffleLimit
+ || this.usedMemory + actualSize > this.memoryLimit) {
+ LOG.info("Allocate DiskFetchedInput, length:{}", actualSize);
+ return new DiskFetchedInput(actualSize + 8, inputAttemptIdentifier, this, conf,
+ localDirAllocator, fileNameAllocator);
+ } else {
+ this.usedMemory += actualSize;
+ if (LOG.isDebugEnabled()) {
+ LOG.info(srcNameTrimmed + ": " + "Used memory after allocating " + actualSize + " : " + usedMemory);
+ }
+ return new MemoryFetchedInput(actualSize, inputAttemptIdentifier, this);
+ }
+ }
+
+ @Override
+ public synchronized FetchedInput allocateType(Type type, long actualSize,
+ long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
+ throws IOException {
+
+ switch (type) {
+ case DISK:
+ LOG.info("AllocateType DiskFetchedInput, compressedSize:{}", compressedSize);
+ return new DiskFetchedInput(compressedSize + 8, inputAttemptIdentifier, this,
+ conf, localDirAllocator, fileNameAllocator);
+ default:
+ return allocate(actualSize, compressedSize, inputAttemptIdentifier);
+ }
+ }
+
+ @Override
+ public synchronized void fetchComplete(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ // Not tracking anything here.
+ case DISK:
+ case DISK_DIRECT:
+ case MEMORY:
+ break;
+ default:
+ throw new RssException("InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
+ }
+ }
+
+ @Override
+ public synchronized void fetchFailed(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ @Override
+ public synchronized void freeResources(FetchedInput fetchedInput) {
+ cleanup(fetchedInput);
+ }
+
+ private void cleanup(FetchedInput fetchedInput) {
+ switch (fetchedInput.getType()) {
+ case DISK:
+ break;
+ case MEMORY:
+ unreserve(((MemoryFetchedInput) fetchedInput).getSize());
+ break;
+ default:
+ throw new RssException("InputType: " + fetchedInput.getType() + " not expected for Broadcast fetch");
+ }
+ }
+
+ private synchronized void unreserve(long size) {
+ this.usedMemory -= size;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(srcNameTrimmed + ": " + "Used memory after freeing " + size + " : " + usedMemory);
+ }
+ }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java
new file mode 100644
index 00000000..b5bfaf2c
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssSimpleFetchedInputAllocatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssSimpleFetchedInputAllocatorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RssSimpleFetchedInputAllocatorTest.class);
+
+ @Test
+ public void testAllocate() throws IOException {
+ Configuration conf = new Configuration();
+
+ long jvmMax = 954728448L;
+ LOG.info("jvmMax: " + jvmMax);
+
+ float bufferPercent = 0.1f;
+ conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, bufferPercent);
+ conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+
+ long inMemThreshold = (long) (bufferPercent * jvmMax);
+ LOG.info("InMemThreshold: " + inMemThreshold);
+
+ RssSimpleFetchedInputAllocator inputManager = new RssSimpleFetchedInputAllocator(
+ "srcName", UUID.randomUUID().toString(), 123, conf,
+ Runtime.getRuntime().maxMemory(), inMemThreshold);
+
+ long requestSize = (long) (0.4f * inMemThreshold);
+ long compressedSize = 1L;
+ LOG.info("RequestSize: " + requestSize);
+
+ FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+
+ FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+
+ MockedConstruction<DiskFetchedInput> mockedConstruction =
+ Mockito.mockConstruction(DiskFetchedInput.class, ((mockedInput, context) -> {
+ // Over limit by this point. Next reserve should give back a DISK allocation
+ FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
+ assertEquals(FetchedInput.Type.DISK, fi3.getType());
+
+ // Freed one memory allocation. Next should be mem again.
+ fi1.abort();
+ fi1.free();
+ FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+ assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+
+ // Freed one disk allocation. Next sould be disk again (no mem freed)
+ fi3.abort();
+ fi3.free();
+ FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
+ assertEquals(FetchedInput.Type.DISK, fi5.getType());
+ }));
+ }
+}