You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:20:06 UTC

svn commit: r1181558 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/util/IdLock.java test/java/org/apache/hadoop/hbase/util/TestIdLock.java

Author: nspiegelberg
Date: Tue Oct 11 02:20:06 2011
New Revision: 1181558

URL: http://svn.apache.org/viewvc?rev=1181558&view=rev
Log:
"IdLock" synchronization primitive that will be used to synchronize on block offsets in HFileReaderV2

Summary:
HFile block read operations are currently synchronized on the block index key
object for the particular block. This is done so that only one client goes to
HDFS to read the block and caches it, and all other clients interested in the
block pick it up from the cache. However, with HFile format v2, the data block
index might be multi-level, so there is not a canonical object to use for
synchronizing clients. The IdLock synchronization primitive allows to
synchronize on arbitrary long numbers, creating temporary objects for them as
necessary.

Test Plan:
New unit test. Load testing as part of HFile format v2 testing.

Reviewed By: kannan
Reviewers: kannan, kranganathan, liyintang
Commenters: nspiegelberg, jgray
CC: , hbase@lists, kannan, nspiegelberg, jgray
Revert Plan:
OK

Differential Revision: 248448

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/IdLock.java?rev=1181558&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/IdLock.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/IdLock.java Tue Oct 11 02:20:06 2011
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Allows multiple concurrent clients to lock on a numeric id with a minimal
+ * memory overhead. The intended usage is as follows:
+ *
+ * <pre>
+ * IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ * try {
+ *   // User code.
+ * } finally {
+ *   idLock.releaseLockEntry(lockEntry);
+ * }</pre>
+ */
+public class IdLock {
+
+  /** An entry returned to the client as a lock object */
+  public static class Entry {
+    private final Long id;
+    private int numWaiters;
+    private boolean isLocked = true;
+
+    public Entry(long id) {
+      this.id = id;
+    }
+
+    public String toString() {
+      return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+          + isLocked;
+    }
+  }
+
+  private ConcurrentMap<Long, Entry> map =
+      new ConcurrentHashMap<Long, Entry>();
+
+  /**
+   * Blocks until the lock corresponding to the given id is acquired.
+   *
+   * @param id an arbitrary number to lock on
+   * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
+   *         the lock
+   * @throws IOException if interrupted
+   */
+  public Entry getLockEntry(long id) throws IOException {
+    Entry entry = new Entry(id);
+    Entry existing;
+    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
+      synchronized (existing) {
+        if (existing.isLocked) {
+          ++existing.numWaiters;  // Add ourselves to waiters.
+          while (existing.isLocked) {
+            try {
+              existing.wait();
+            } catch (InterruptedException e) {
+              --existing.numWaiters;  // Remove ourselves from waiters.
+              throw new InterruptedIOException(
+                  "Interrupted waiting to acquire sparse lock");
+            }
+          }
+
+          --existing.numWaiters;  // Remove ourselves from waiters.
+          existing.isLocked = true;
+          return existing;
+        }
+        // If the entry is not locked, it might already be deleted from the
+        // map, so we cannot return it. We need to get our entry into the map
+        // or get someone else's locked entry.
+      }
+    }
+    return entry;
+  }
+
+  /**
+   * Must be called in a finally block to decrease the internal counter and
+   * remove the monitor object for the given id if the caller is the last
+   * client.
+   *
+   * @param entry the return value of {@link #getLockEntry(long)}
+   */
+  public void releaseLockEntry(Entry entry) {
+    synchronized (entry) {
+      entry.isLocked = false;
+      if (entry.numWaiters > 0) {
+        entry.notify();
+      } else {
+        map.remove(entry.id);
+      }
+    }
+  }
+
+  /** For testing */
+  void assertMapEmpty() {
+    assert map.size() == 0;
+  }
+
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java?rev=1181558&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java Tue Oct 11 02:20:06 2011
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class TestIdLock {
+
+  private static final Log LOG = LogFactory.getLog(TestIdLock.class);
+
+  private static final int NUM_IDS = 16;
+  private static final int NUM_THREADS = 128;
+  private static final int NUM_SECONDS = 20;
+
+  private IdLock idLock = new IdLock();
+
+  private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
+
+  private class IdLockTestThread implements Callable<Boolean> {
+
+    private String clientId;
+
+    public IdLockTestThread(String clientId) {
+      this.clientId = clientId;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      Thread.currentThread().setName(clientId);
+      Random rand = new Random();
+      long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
+      while (System.currentTimeMillis() < endTime) {
+        long id = rand.nextInt(NUM_IDS);
+
+        LOG.info(clientId + " is waiting for id " + id);
+        IdLock.Entry lockEntry = idLock.getLockEntry(id);
+        try {
+          int sleepMs = 1 + rand.nextInt(4);
+          String owner = idOwner.get(id);
+          if (owner != null) {
+            LOG.error("Id " + id + " already taken by " + owner + ", "
+                + clientId + " failed");
+            return false;
+          }
+
+          idOwner.put(id, clientId);
+          LOG.info(clientId + " took id " + id + ", sleeping for " +
+              sleepMs + "ms");
+          Thread.sleep(sleepMs);
+          LOG.info(clientId + " is releasing id " + id);
+          idOwner.remove(id);
+
+        } finally {
+          idLock.releaseLockEntry(lockEntry);
+        }
+      }
+      return true;
+    }
+
+  }
+
+  @Test
+  public void testMultipleClients() throws Exception {
+    ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+    try {
+      ExecutorCompletionService<Boolean> ecs =
+          new ExecutorCompletionService<Boolean>(exec);
+      for (int i = 0; i < NUM_THREADS; ++i)
+        ecs.submit(new IdLockTestThread("client_" + i));
+      for (int i = 0; i < NUM_THREADS; ++i) {
+        Future<Boolean> result = ecs.take();
+        assertTrue(result.get());
+      }
+      idLock.assertMapEmpty();
+    } finally {
+      exec.shutdown();
+    }
+  }
+
+}