You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/10/29 16:48:37 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #2396: HADOOP-17313. FileSystem.get to support slow-to-instantiate FS clients.

steveloughran commented on a change in pull request #2396:
URL: https://github.com/apache/hadoop/pull/2396#discussion_r514411189



##########
File path: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
##########
@@ -336,4 +344,134 @@ public void testCacheIncludesURIUserInfo() throws Throwable {
     assertNotEquals(keyA, new FileSystem.Cache.Key(
         new URI("wasb://a:password@account.blob.core.windows.net"), conf));
   }
+
+
+  /**
+   * Single semaphore: no surplus FS instances will be created
+   * and then discarded.
+   */
+  @Test
+  public void testCacheSingleSemaphoredConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(1);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(0);
+  }
+
+  /**
+   * Dual semaphore: thread 2 will get as far as
+   * blocking in the initialize() method while awaiting
+   * thread 1 to complete its initialization.
+   * <p></p>
+   * The thread 2 FS instance will be discarded.
+   * All other threads will block for a cache semaphore,
+   * so when they are given an opportunity to proceed,
+   * they will find that an FS instance exists.
+   */
+  @Test
+  public void testCacheDualSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(2);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(1);
+  }
+
+  /**
+   * Construct the FS instances in a cache with effectively no
+   * limit on the number of instances which can be created
+   * simultaneously.
+   * <p></p>
+   * This is the effective state before HADOOP-17313.
+   * <p></p>
+   * All but one thread's FS instance will be discarded.
+   */
+  @Test
+  public void testCacheLargeSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(999);
+    int count = 10;
+    createFileSystems(cache, count);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(count -1);
+  }
+
+  /**
+   * Create a cache with a given semaphore size.
+   * @param semaphores number of semaphores
+   * @return the cache.
+   */
+  private FileSystem.Cache semaphoredCache(final int semaphores) {
+    final Configuration conf1 = new Configuration();
+    conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
+    FileSystem.Cache cache = new FileSystem.Cache(conf1);
+    return cache;
+  }
+
+  /**
+   * Attempt to create {@code count} filesystems in parallel,
+   * then assert that they are all equal.
+   * @param cache cache to use
+   * @param count count of filesystems to instantiate
+   */
+  private void createFileSystems(final FileSystem.Cache cache, final int count)
+      throws URISyntaxException, InterruptedException,
+             java.util.concurrent.ExecutionException {
+    final Configuration conf = new Configuration();
+    conf.set("fs.blocking.impl", BlockingInitializer.NAME);
+    // only one instance can be created at a time.
+    URI uri = new URI("blocking://a");
+    ListeningExecutorService pool =
+        BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
+            10, TimeUnit.SECONDS,
+            "creation-threads");
+
+    // submit a set of requests to create an FS instance.
+    // the semaphore will block all but one, and that will block until
+    // it is allowed to continue
+    List<ListenableFuture<FileSystem>> futures = new ArrayList<>(count);
+
+    // acquire the semaphore so blocking all FS instances from
+    // being initialized.
+    Semaphore semaphore = BlockingInitializer.sem;
+    semaphore.acquire();
+
+    // su

Review comment:
       cut




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org