You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/16 06:02:14 UTC
[incubator-uniffle] branch master updated: [MINOR] Remove unused code of shuffle upload (#883)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 40e75122 [MINOR] Remove unused code of shuffle upload (#883)
40e75122 is described below
commit 40e75122c279adcb3f59662fc53cd164089e540b
Author: roryqi <ro...@apache.org>
AuthorDate: Tue May 16 14:02:09 2023 +0800
[MINOR] Remove unused code of shuffle upload (#883)
### What changes were proposed in this pull request?
1. Remove the meta lock, this lock is used for upload shuffle function.
2. Remove the meta of upload shuffle function.
### Why are the changes needed?
Simplify our logic
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA passed.
---
.../server/storage/SingleStorageManager.java | 11 ---
.../apache/uniffle/storage/common/HdfsStorage.java | 10 ---
.../uniffle/storage/common/LocalStorage.java | 69 ++----------------
.../uniffle/storage/common/LocalStorageMeta.java | 82 ----------------------
.../org/apache/uniffle/storage/common/Storage.java | 4 --
.../uniffle/storage/common/LocalStorageTest.java | 82 ----------------------
6 files changed, 7 insertions(+), 251 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 2acc49c4..9bda5f25 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -55,15 +55,6 @@ public abstract class SingleStorageManager implements StorageManager {
public boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event) {
String shuffleKey = RssUtils.generateShuffleKey(event.getAppId(), event.getShuffleId());
storage.createMetadataIfNotExist(shuffleKey);
-
- boolean locked = storage.lockShuffleShared(shuffleKey);
- if (!locked) {
- LOG.warn("AppId {} shuffleId {} was removed already, lock don't exist {} should be dropped,"
- + " may leak one handler", event.getAppId(), event.getShuffleId(), event);
- throw new IllegalStateException("AppId " + event.getAppId() + " ShuffleId " + event.getShuffleId()
- + " was removed");
- }
-
try {
long startWrite = System.currentTimeMillis();
handler.write(event.getShuffleBlocks());
@@ -74,8 +65,6 @@ public abstract class SingleStorageManager implements StorageManager {
LOG.warn("Exception happened when write data for " + event + ", try again", e);
ShuffleServerMetrics.counterWriteException.inc();
Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
- } finally {
- storage.unlockShuffleShared(shuffleKey);
}
return false;
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
index 4b023d1e..118686a7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
@@ -67,16 +67,6 @@ public class HdfsStorage extends AbstractStorage {
return true;
}
- @Override
- public boolean lockShuffleShared(String shuffleKey) {
- return true;
- }
-
- @Override
- public boolean unlockShuffleShared(String shuffleKey) {
- return true;
- }
-
@Override
public void updateReadMetrics(StorageReadMetrics metrics) {
// do nothing
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index b5ca09e8..6b564d57 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -24,11 +24,9 @@ import java.nio.file.Files;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
-import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,26 +99,6 @@ public class LocalStorage extends AbstractStorage {
return STORAGE_HOST;
}
- @Override
- public boolean lockShuffleShared(String shuffleKey) {
- ReadWriteLock lock = getLock(shuffleKey);
- if (lock == null) {
- return false;
- }
- lock.readLock().lock();
- return true;
- }
-
- @Override
- public boolean unlockShuffleShared(String shuffleKey) {
- ReadWriteLock lock = getLock(shuffleKey);
- if (lock == null) {
- return false;
- }
- lock.readLock().unlock();
- return true;
- }
-
@Override
public void updateWriteMetrics(StorageWriteMetrics metrics) {
updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()),
@@ -189,15 +167,6 @@ public class LocalStorage extends AbstractStorage {
metaData.updateShuffleLastReadTs(shuffleKey);
}
- public RoaringBitmap getNotUploadedPartitions(String key) {
- return metaData.getNotUploadedPartitions(key);
- }
-
- public void updateUploadedShuffle(String shuffleKey, long size, List<Integer> partitions) {
- metaData.updateUploadedShuffleSize(shuffleKey, size);
- metaData.addUploadedShufflePartitionList(shuffleKey, partitions);
- }
-
public long getDiskSize() {
return metaData.getDiskSize().longValue();
}
@@ -226,40 +195,16 @@ public class LocalStorage extends AbstractStorage {
// add the shuffle key back to the expiredShuffleKeys if get lock but fail to acquire write lock.
public void removeResources(String shuffleKey) {
LOG.info("Start to remove resource of {}", shuffleKey);
- ReadWriteLock lock = metaData.getLock(shuffleKey);
- if (lock == null) {
- LOG.info("Ignore shuffle {} for its resource was removed already", shuffleKey);
- return;
- }
-
- if (lock.writeLock().tryLock()) {
- try {
- metaData.updateDiskSize(-metaData.getShuffleSize(shuffleKey));
- metaData.remoteShuffle(shuffleKey);
- LOG.info("Finish remove resource of {}, disk size is {} and {} shuffle metadata",
- shuffleKey, metaData.getDiskSize(), metaData.getShuffleMetaSet().size());
- } catch (Exception e) {
- LOG.error("Fail to update disk size", e);
- } finally {
- lock.writeLock().unlock();
- }
- } else {
- LOG.info("Fail to get write lock of {}, add it back to expired shuffle queue", shuffleKey);
+ try {
+ metaData.updateDiskSize(-metaData.getShuffleSize(shuffleKey));
+ metaData.remoteShuffle(shuffleKey);
+ LOG.info("Finish remove resource of {}, disk size is {} and {} shuffle metadata",
+ shuffleKey, metaData.getDiskSize(), metaData.getShuffleMetaSet().size());
+ } catch (Exception e) {
+ LOG.error("Fail to update disk size", e);
}
}
- public ReadWriteLock getLock(String shuffleKey) {
- return metaData.getLock(shuffleKey);
- }
-
- public long getNotUploadedSize(String key) {
- return metaData.getNotUploadedSize(key);
- }
-
- public List<String> getSortedShuffleKeys(boolean checkRead, int num) {
- return metaData.getSortedShuffleKeys(checkRead, num);
- }
-
public boolean isCorrupted() {
return isCorrupted;
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index e67cd87b..cc82a8e6 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -19,13 +19,9 @@ package org.apache.uniffle.storage.common;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.RoaringBitmap;
@@ -47,52 +43,6 @@ public class LocalStorageMeta {
private final AtomicLong size = new AtomicLong(0L);
private final Map<String, ShuffleMeta> shuffleMetaMap = JavaUtils.newConcurrentMap();
- // todo: add ut
- public List<String> getSortedShuffleKeys(boolean checkRead, int hint) {
- // Filter the unread shuffle is checkRead is true
- // Filter the remaining size is 0
- List<Map.Entry<String, ShuffleMeta>> shuffleMetaList = shuffleMetaMap
- .entrySet()
- .stream()
- .filter(e -> (!checkRead || e.getValue().isStartRead.get()) && e.getValue().getNotUploadedSize() > 0)
- .collect(Collectors.toList());
-
- shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String, ShuffleMeta> o2) -> {
- long sz1 = o1.getValue().getSize().longValue();
- long sz2 = o2.getValue().getSize().longValue();
- return Long.compare(sz2, sz1);
- });
-
- return shuffleMetaList
- .subList(0, Math.min(shuffleMetaList.size(), hint))
- .stream()
- .map(Entry::getKey).collect(Collectors.toList());
- }
-
- public RoaringBitmap getNotUploadedPartitions(String shuffleKey) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- if (shuffleMeta == null) {
- return RoaringBitmap.bitmapOf();
- }
-
- RoaringBitmap partitionBitmap;
- RoaringBitmap uploadedPartitionBitmap;
- synchronized (shuffleMeta.partitionBitmap) {
- partitionBitmap = shuffleMeta.partitionBitmap.clone();
- }
- synchronized (shuffleMeta.uploadedPartitionBitmap) {
- uploadedPartitionBitmap = shuffleMeta.uploadedPartitionBitmap.clone();
- }
- for (int partition : uploadedPartitionBitmap) {
- partitionBitmap.remove(partition);
- }
- return partitionBitmap;
- }
-
- public long getNotUploadedSize(String shuffleKey) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- return shuffleMeta == null ? 0 : shuffleMeta.getNotUploadedSize();
- }
public void updateDiskSize(long delta) {
size.addAndGet(delta);
@@ -105,13 +55,6 @@ public class LocalStorageMeta {
}
}
- public void updateUploadedShuffleSize(String shuffleKey, long delta) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- if (shuffleMeta != null) {
- shuffleMeta.uploadedSize.addAndGet(delta);
- }
- }
-
public void addShufflePartitionList(String shuffleKey, List<Integer> partitions) {
ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
if (shuffleMeta != null) {
@@ -122,16 +65,6 @@ public class LocalStorageMeta {
}
}
- public void addUploadedShufflePartitionList(String shuffleKey, List<Integer> partitions) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- if (shuffleMeta != null) {
- RoaringBitmap bitmap = shuffleMeta.uploadedPartitionBitmap;
- synchronized (bitmap) {
- partitions.forEach(bitmap::add);
- }
- }
- }
-
public void prepareStartRead(String shuffleId) {
ShuffleMeta shuffleMeta = getShuffleMeta(shuffleId);
if (shuffleMeta != null) {
@@ -199,29 +132,17 @@ public class LocalStorageMeta {
}
}
- public ReadWriteLock getLock(String shuffleKey) {
- ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
- return shuffleMeta == null ? null : shuffleMeta.getLock();
- }
-
// Consider that ShuffleMeta is a simple class, we keep the class ShuffleMeta as an inner class.
private static class ShuffleMeta {
private final AtomicLong size = new AtomicLong(0);
private final RoaringBitmap partitionBitmap = RoaringBitmap.bitmapOf();
- private final AtomicLong uploadedSize = new AtomicLong(0);
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicBoolean isStartRead = new AtomicBoolean(false);
- private final RoaringBitmap uploadedPartitionBitmap = RoaringBitmap.bitmapOf();
private final AtomicLong lastReadTs = new AtomicLong(-1L);
public AtomicLong getSize() {
return size;
}
- public long getNotUploadedSize() {
- return size.longValue() - uploadedSize.longValue();
- }
-
public void markStartRead() {
isStartRead.set(true);
}
@@ -230,8 +151,5 @@ public class LocalStorageMeta {
lastReadTs.set(System.currentTimeMillis());
}
- public ReadWriteLock getLock() {
- return lock;
- }
}
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index f2fb9c61..452701f9 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -29,10 +29,6 @@ public interface Storage {
boolean canWrite();
- boolean lockShuffleShared(String shuffleKey);
-
- boolean unlockShuffleShared(String shuffleKey);
-
void updateWriteMetrics(StorageWriteMetrics metrics);
void updateReadMetrics(StorageReadMetrics metrics);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 3f1a17cd..de0f8e1e 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -21,19 +21,15 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
-import java.util.List;
import java.util.Set;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.roaringbitmap.RoaringBitmap;
import org.apache.uniffle.common.storage.StorageMedia;
-import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;
@@ -130,84 +126,6 @@ public class LocalStorageTest {
createTestStorage(notExisted);
}
- @Test
- public void removeResourcesTest() throws Exception {
- LocalStorage item = prepareDiskItem();
- final String key1 = RssUtils.generateShuffleKey("1", 1);
- final String key2 = RssUtils.generateShuffleKey("1", 2);
- item.removeResources(key1);
- assertEquals(50L, item.getMetaData().getDiskSize().get());
- assertEquals(0L, item.getMetaData().getShuffleSize(key1));
- assertEquals(50L, item.getMetaData().getShuffleSize(key2));
- assertTrue(item.getMetaData().getNotUploadedPartitions(key1).isEmpty());
- }
-
- private LocalStorage prepareDiskItem() {
- final LocalStorage item = createTestStorage(testBaseDir);
- RoaringBitmap partitionBitMap = RoaringBitmap.bitmapOf();
- partitionBitMap.add(1);
- partitionBitMap.add(2);
- partitionBitMap.add(1);
- List<Integer> partitionList = Lists.newArrayList(1, 2);
- item.createMetadataIfNotExist("1/1");
- item.createMetadataIfNotExist("1/2");
- item.updateWrite("1/1", 100, partitionList);
- item.updateWrite("1/2", 50, Lists.newArrayList());
- assertEquals(150L, item.getMetaData().getDiskSize().get());
- assertEquals(2, item.getMetaData().getNotUploadedPartitions("1/1").getCardinality());
- assertTrue(partitionBitMap.contains(item.getMetaData().getNotUploadedPartitions("1/1")));
- return item;
- }
-
- @Test
- public void concurrentRemoveResourcesTest() throws Exception {
- LocalStorage item = prepareDiskItem();
- Runnable runnable = () -> item.removeResources("1/1");
- List<Thread> testThreads = Lists.newArrayList(new Thread(runnable), new Thread(runnable), new Thread(runnable));
- testThreads.forEach(Thread::start);
- testThreads.forEach(t -> {
- try {
- t.join();
- } catch (InterruptedException e) {
- // ignore
- }
- });
-
- assertEquals(50L, item.getMetaData().getDiskSize().get());
- assertEquals(0L, item.getMetaData().getShuffleSize("1/1"));
- assertEquals(50L, item.getMetaData().getShuffleSize("1/2"));
- assertTrue(item.getMetaData().getNotUploadedPartitions("1/1").isEmpty());
- }
-
- @Test
- public void diskMetaTest() {
- LocalStorage item = createTestStorage(testBaseDir);
- List<Integer> partitionList1 = Lists.newArrayList(1, 2, 3, 4, 5);
- List<Integer> partitionList2 = Lists.newArrayList(6, 7, 8, 9, 10);
- List<Integer> partitionList3 = Lists.newArrayList(1, 2, 3);
- item.createMetadataIfNotExist("key1");
- item.createMetadataIfNotExist("key2");
- item.updateWrite("key1", 10, partitionList1);
- item.updateWrite("key2", 30, partitionList2);
- item.updateUploadedShuffle("key1", 5, partitionList3);
-
- assertTrue(item.getNotUploadedPartitions("notKey").isEmpty());
- assertEquals(2, item.getNotUploadedPartitions("key1").getCardinality());
- assertEquals(5, item.getNotUploadedPartitions("key2").getCardinality());
- assertEquals(0, item.getNotUploadedSize("notKey"));
- assertEquals(5, item.getNotUploadedSize("key1"));
- assertEquals(30, item.getNotUploadedSize("key2"));
-
- assertTrue(item.getSortedShuffleKeys(true, 1).isEmpty());
- assertTrue(item.getSortedShuffleKeys(true, 2).isEmpty());
- item.prepareStartRead("key1");
- assertEquals(1, item.getSortedShuffleKeys(true, 3).size());
- assertEquals(1, item.getSortedShuffleKeys(false, 1).size());
- assertEquals("key2", item.getSortedShuffleKeys(false, 1).get(0));
- assertEquals(2, item.getSortedShuffleKeys(false, 2).size());
- assertEquals(2, item.getSortedShuffleKeys(false, 3).size());
- }
-
@Test
public void diskStorageInfoTest() {
LocalStorage item = LocalStorage.newBuilder()