You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/16 06:02:14 UTC

[incubator-uniffle] branch master updated: [MINOR] Remove unused code of shuffle upload (#883)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e75122 [MINOR] Remove unused code of shuffle upload (#883)
40e75122 is described below

commit 40e75122c279adcb3f59662fc53cd164089e540b
Author: roryqi <ro...@apache.org>
AuthorDate: Tue May 16 14:02:09 2023 +0800

    [MINOR] Remove unused code of shuffle upload (#883)
    
    ### What changes were proposed in this pull request?
    1. Remove the meta lock, this lock is used for upload shuffle function.
    2. Remove the meta of upload shuffle function.
    
    ### Why are the changes needed?
    Simplify our logic
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    GA passed.
---
 .../server/storage/SingleStorageManager.java       | 11 ---
 .../apache/uniffle/storage/common/HdfsStorage.java | 10 ---
 .../uniffle/storage/common/LocalStorage.java       | 69 ++----------------
 .../uniffle/storage/common/LocalStorageMeta.java   | 82 ----------------------
 .../org/apache/uniffle/storage/common/Storage.java |  4 --
 .../uniffle/storage/common/LocalStorageTest.java   | 82 ----------------------
 6 files changed, 7 insertions(+), 251 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 2acc49c4..9bda5f25 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -55,15 +55,6 @@ public abstract class SingleStorageManager implements StorageManager {
   public boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event) {
     String shuffleKey = RssUtils.generateShuffleKey(event.getAppId(), event.getShuffleId());
     storage.createMetadataIfNotExist(shuffleKey);
-
-    boolean locked = storage.lockShuffleShared(shuffleKey);
-    if (!locked) {
-      LOG.warn("AppId {} shuffleId {} was removed already, lock don't exist {} should be dropped,"
-          + " may leak one handler", event.getAppId(), event.getShuffleId(), event);
-      throw new IllegalStateException("AppId " + event.getAppId() + " ShuffleId " + event.getShuffleId()
-        + " was removed");
-    }
-
     try {
       long startWrite = System.currentTimeMillis();
       handler.write(event.getShuffleBlocks());
@@ -74,8 +65,6 @@ public abstract class SingleStorageManager implements StorageManager {
       LOG.warn("Exception happened when write data for " + event + ", try again", e);
       ShuffleServerMetrics.counterWriteException.inc();
       Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
-    } finally {
-      storage.unlockShuffleShared(shuffleKey);
     }
     return false;
   }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
index 4b023d1e..118686a7 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
@@ -67,16 +67,6 @@ public class HdfsStorage extends AbstractStorage {
     return true;
   }
 
-  @Override
-  public boolean lockShuffleShared(String shuffleKey) {
-    return true;
-  }
-
-  @Override
-  public boolean unlockShuffleShared(String shuffleKey) {
-    return true;
-  }
-
   @Override
   public void updateReadMetrics(StorageReadMetrics metrics) {
     // do nothing
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index b5ca09e8..6b564d57 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -24,11 +24,9 @@ import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
-import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,26 +99,6 @@ public class LocalStorage extends AbstractStorage {
     return STORAGE_HOST;
   }
 
-  @Override
-  public boolean lockShuffleShared(String shuffleKey) {
-    ReadWriteLock lock = getLock(shuffleKey);
-    if (lock == null) {
-      return false;
-    }
-    lock.readLock().lock();
-    return true;
-  }
-
-  @Override
-  public boolean unlockShuffleShared(String shuffleKey) {
-    ReadWriteLock lock = getLock(shuffleKey);
-    if (lock == null) {
-      return false;
-    }
-    lock.readLock().unlock();
-    return true;
-  }
-
   @Override
   public void updateWriteMetrics(StorageWriteMetrics metrics) {
     updateWrite(RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()),
@@ -189,15 +167,6 @@ public class LocalStorage extends AbstractStorage {
     metaData.updateShuffleLastReadTs(shuffleKey);
   }
 
-  public RoaringBitmap getNotUploadedPartitions(String key) {
-    return metaData.getNotUploadedPartitions(key);
-  }
-
-  public void updateUploadedShuffle(String shuffleKey, long size, List<Integer> partitions) {
-    metaData.updateUploadedShuffleSize(shuffleKey, size);
-    metaData.addUploadedShufflePartitionList(shuffleKey, partitions);
-  }
-
   public long getDiskSize() {
     return metaData.getDiskSize().longValue();
   }
@@ -226,40 +195,16 @@ public class LocalStorage extends AbstractStorage {
   // add the shuffle key back to the expiredShuffleKeys if get lock but fail to acquire write lock.
   public void removeResources(String shuffleKey) {
     LOG.info("Start to remove resource of {}", shuffleKey);
-    ReadWriteLock lock = metaData.getLock(shuffleKey);
-    if (lock == null) {
-      LOG.info("Ignore shuffle {} for its resource was removed already", shuffleKey);
-      return;
-    }
-
-    if (lock.writeLock().tryLock()) {
-      try {
-        metaData.updateDiskSize(-metaData.getShuffleSize(shuffleKey));
-        metaData.remoteShuffle(shuffleKey);
-        LOG.info("Finish remove resource of {}, disk size is {} and {} shuffle metadata",
-            shuffleKey, metaData.getDiskSize(), metaData.getShuffleMetaSet().size());
-      } catch (Exception e) {
-        LOG.error("Fail to update disk size", e);
-      } finally {
-        lock.writeLock().unlock();
-      }
-    } else {
-      LOG.info("Fail to get write lock of {}, add it back to expired shuffle queue", shuffleKey);
+    try {
+      metaData.updateDiskSize(-metaData.getShuffleSize(shuffleKey));
+      metaData.remoteShuffle(shuffleKey);
+      LOG.info("Finish remove resource of {}, disk size is {} and {} shuffle metadata",
+          shuffleKey, metaData.getDiskSize(), metaData.getShuffleMetaSet().size());
+    } catch (Exception e) {
+      LOG.error("Fail to update disk size", e);
     }
   }
 
-  public ReadWriteLock getLock(String shuffleKey) {
-    return metaData.getLock(shuffleKey);
-  }
-
-  public long getNotUploadedSize(String key) {
-    return metaData.getNotUploadedSize(key);
-  }
-
-  public List<String> getSortedShuffleKeys(boolean checkRead, int num) {
-    return metaData.getSortedShuffleKeys(checkRead, num);
-  }
-
   public boolean isCorrupted() {
     return isCorrupted;
   }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index e67cd87b..cc82a8e6 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -19,13 +19,9 @@ package org.apache.uniffle.storage.common;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.roaringbitmap.RoaringBitmap;
@@ -47,52 +43,6 @@ public class LocalStorageMeta {
   private final AtomicLong size = new AtomicLong(0L);
   private final Map<String, ShuffleMeta> shuffleMetaMap = JavaUtils.newConcurrentMap();
 
-  // todo: add ut
-  public List<String> getSortedShuffleKeys(boolean checkRead, int hint) {
-    // Filter the unread shuffle is checkRead is true
-    // Filter the remaining size is 0
-    List<Map.Entry<String, ShuffleMeta>> shuffleMetaList = shuffleMetaMap
-        .entrySet()
-        .stream()
-        .filter(e -> (!checkRead || e.getValue().isStartRead.get()) && e.getValue().getNotUploadedSize() > 0)
-        .collect(Collectors.toList());
-
-    shuffleMetaList.sort((Entry<String, ShuffleMeta> o1, Entry<String, ShuffleMeta> o2) -> {
-      long sz1 = o1.getValue().getSize().longValue();
-      long sz2 = o2.getValue().getSize().longValue();
-      return Long.compare(sz2, sz1);
-    });
-
-    return shuffleMetaList
-        .subList(0, Math.min(shuffleMetaList.size(), hint))
-        .stream()
-        .map(Entry::getKey).collect(Collectors.toList());
-  }
-
-  public RoaringBitmap getNotUploadedPartitions(String shuffleKey) {
-    ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
-    if (shuffleMeta == null) {
-      return RoaringBitmap.bitmapOf();
-    }
-
-    RoaringBitmap partitionBitmap;
-    RoaringBitmap uploadedPartitionBitmap;
-    synchronized (shuffleMeta.partitionBitmap) {
-      partitionBitmap = shuffleMeta.partitionBitmap.clone();
-    }
-    synchronized (shuffleMeta.uploadedPartitionBitmap) {
-      uploadedPartitionBitmap = shuffleMeta.uploadedPartitionBitmap.clone();
-    }
-    for (int partition : uploadedPartitionBitmap) {
-      partitionBitmap.remove(partition);
-    }
-    return partitionBitmap;
-  }
-
-  public long getNotUploadedSize(String shuffleKey) {
-    ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
-    return shuffleMeta == null ? 0 : shuffleMeta.getNotUploadedSize();
-  }
 
   public void updateDiskSize(long delta) {
     size.addAndGet(delta);
@@ -105,13 +55,6 @@ public class LocalStorageMeta {
     }
   }
 
-  public void updateUploadedShuffleSize(String shuffleKey, long delta) {
-    ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
-    if (shuffleMeta != null) {
-      shuffleMeta.uploadedSize.addAndGet(delta);
-    }
-  }
-
   public void addShufflePartitionList(String shuffleKey, List<Integer> partitions) {
     ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
     if (shuffleMeta != null) {
@@ -122,16 +65,6 @@ public class LocalStorageMeta {
     }
   }
 
-  public void addUploadedShufflePartitionList(String shuffleKey, List<Integer> partitions) {
-    ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
-    if (shuffleMeta != null) {
-      RoaringBitmap bitmap = shuffleMeta.uploadedPartitionBitmap;
-      synchronized (bitmap) {
-        partitions.forEach(bitmap::add);
-      }
-    }
-  }
-
   public void prepareStartRead(String shuffleId) {
     ShuffleMeta shuffleMeta = getShuffleMeta(shuffleId);
     if (shuffleMeta != null) {
@@ -199,29 +132,17 @@ public class LocalStorageMeta {
     }
   }
 
-  public ReadWriteLock getLock(String shuffleKey) {
-    ShuffleMeta shuffleMeta = getShuffleMeta(shuffleKey);
-    return shuffleMeta == null ? null : shuffleMeta.getLock();
-  }
-
   // Consider that ShuffleMeta is a simple class, we keep the class ShuffleMeta as an inner class.
   private static class ShuffleMeta {
     private final AtomicLong size = new AtomicLong(0);
     private final RoaringBitmap partitionBitmap = RoaringBitmap.bitmapOf();
-    private final AtomicLong uploadedSize = new AtomicLong(0);
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final AtomicBoolean isStartRead = new AtomicBoolean(false);
-    private final RoaringBitmap uploadedPartitionBitmap = RoaringBitmap.bitmapOf();
     private final AtomicLong lastReadTs = new AtomicLong(-1L);
 
     public AtomicLong getSize() {
       return size;
     }
 
-    public long getNotUploadedSize() {
-      return size.longValue() - uploadedSize.longValue();
-    }
-
     public void markStartRead() {
       isStartRead.set(true);
     }
@@ -230,8 +151,5 @@ public class LocalStorageMeta {
       lastReadTs.set(System.currentTimeMillis());
     }
 
-    public ReadWriteLock getLock() {
-      return lock;
-    }
   }
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index f2fb9c61..452701f9 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -29,10 +29,6 @@ public interface Storage {
 
   boolean canWrite();
 
-  boolean lockShuffleShared(String shuffleKey);
-
-  boolean unlockShuffleShared(String shuffleKey);
-
   void updateWriteMetrics(StorageWriteMetrics metrics);
 
   void updateReadMetrics(StorageReadMetrics metrics);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 3f1a17cd..de0f8e1e 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -21,19 +21,15 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
-import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.roaringbitmap.RoaringBitmap;
 
 import org.apache.uniffle.common.storage.StorageMedia;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -130,84 +126,6 @@ public class LocalStorageTest {
     createTestStorage(notExisted);
   }
 
-  @Test
-  public void removeResourcesTest() throws Exception {
-    LocalStorage item = prepareDiskItem();
-    final String key1 = RssUtils.generateShuffleKey("1", 1);
-    final String key2 = RssUtils.generateShuffleKey("1", 2);
-    item.removeResources(key1);
-    assertEquals(50L, item.getMetaData().getDiskSize().get());
-    assertEquals(0L, item.getMetaData().getShuffleSize(key1));
-    assertEquals(50L, item.getMetaData().getShuffleSize(key2));
-    assertTrue(item.getMetaData().getNotUploadedPartitions(key1).isEmpty());
-  }
-
-  private LocalStorage prepareDiskItem() {
-    final LocalStorage item = createTestStorage(testBaseDir);
-    RoaringBitmap partitionBitMap = RoaringBitmap.bitmapOf();
-    partitionBitMap.add(1);
-    partitionBitMap.add(2);
-    partitionBitMap.add(1);
-    List<Integer> partitionList = Lists.newArrayList(1, 2);
-    item.createMetadataIfNotExist("1/1");
-    item.createMetadataIfNotExist("1/2");
-    item.updateWrite("1/1", 100, partitionList);
-    item.updateWrite("1/2", 50, Lists.newArrayList());
-    assertEquals(150L, item.getMetaData().getDiskSize().get());
-    assertEquals(2, item.getMetaData().getNotUploadedPartitions("1/1").getCardinality());
-    assertTrue(partitionBitMap.contains(item.getMetaData().getNotUploadedPartitions("1/1")));
-    return item;
-  }
-
-  @Test
-  public void concurrentRemoveResourcesTest() throws Exception {
-    LocalStorage item = prepareDiskItem();
-    Runnable runnable = () -> item.removeResources("1/1");
-    List<Thread> testThreads = Lists.newArrayList(new Thread(runnable), new Thread(runnable), new Thread(runnable));
-    testThreads.forEach(Thread::start);
-    testThreads.forEach(t -> {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        // ignore
-      }
-    });
-
-    assertEquals(50L, item.getMetaData().getDiskSize().get());
-    assertEquals(0L, item.getMetaData().getShuffleSize("1/1"));
-    assertEquals(50L, item.getMetaData().getShuffleSize("1/2"));
-    assertTrue(item.getMetaData().getNotUploadedPartitions("1/1").isEmpty());
-  }
-
-  @Test
-  public void diskMetaTest() {
-    LocalStorage item = createTestStorage(testBaseDir);
-    List<Integer> partitionList1 = Lists.newArrayList(1, 2, 3, 4, 5);
-    List<Integer> partitionList2 = Lists.newArrayList(6, 7, 8, 9, 10);
-    List<Integer> partitionList3 = Lists.newArrayList(1, 2, 3);
-    item.createMetadataIfNotExist("key1");
-    item.createMetadataIfNotExist("key2");
-    item.updateWrite("key1", 10, partitionList1);
-    item.updateWrite("key2", 30, partitionList2);
-    item.updateUploadedShuffle("key1", 5, partitionList3);
-
-    assertTrue(item.getNotUploadedPartitions("notKey").isEmpty());
-    assertEquals(2, item.getNotUploadedPartitions("key1").getCardinality());
-    assertEquals(5, item.getNotUploadedPartitions("key2").getCardinality());
-    assertEquals(0, item.getNotUploadedSize("notKey"));
-    assertEquals(5, item.getNotUploadedSize("key1"));
-    assertEquals(30, item.getNotUploadedSize("key2"));
-
-    assertTrue(item.getSortedShuffleKeys(true, 1).isEmpty());
-    assertTrue(item.getSortedShuffleKeys(true, 2).isEmpty());
-    item.prepareStartRead("key1");
-    assertEquals(1, item.getSortedShuffleKeys(true, 3).size());
-    assertEquals(1, item.getSortedShuffleKeys(false, 1).size());
-    assertEquals("key2", item.getSortedShuffleKeys(false, 1).get(0));
-    assertEquals(2, item.getSortedShuffleKeys(false, 2).size());
-    assertEquals(2, item.getSortedShuffleKeys(false, 3).size());
-  }
-
   @Test
   public void diskStorageInfoTest() {
     LocalStorage item = LocalStorage.newBuilder()