You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/07/24 08:27:09 UTC
git commit: OOZIE-1906 Service to periodically remove ZK lock (puru
via rohini)
Repository: oozie
Updated Branches:
refs/heads/master 5be0a8705 -> 12ef61470
OOZIE-1906 Service to periodically remove ZK lock (puru via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/12ef6147
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/12ef6147
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/12ef6147
Branch: refs/heads/master
Commit: 12ef61470e12aa9885220de1e453dec1da05b28c
Parents: 5be0a87
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Wed Jul 23 23:27:01 2014 -0700
Committer: Rohini Palaniswamy <ro...@apache.org>
Committed: Wed Jul 23 23:27:01 2014 -0700
----------------------------------------------------------------------
.../apache/oozie/service/ZKLocksService.java | 42 ++++++++++++++++++--
core/src/main/resources/oozie-default.xml | 18 +++++++++
.../oozie/service/TestZKLocksService.java | 21 ++++++++++
pom.xml | 7 ++--
release-log.txt | 1 +
5 files changed, 83 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/12ef6147/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index d03a899..3c642db 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -17,10 +17,16 @@
*/
package org.apache.oozie.service;
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.curator.framework.recipes.locks.ChildReaper;
+import org.apache.curator.framework.recipes.locks.Reaper;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
@@ -28,18 +34,30 @@ import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.ZKUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be
* located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the
* resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
+ * <p>
+ * ChildReaper is used for deleting unused locks. Only one childreaper will be active in cluster.
+ * ZK Path /oozie.zookeeper.namespace/services/locksChildReaperLeaderPath is used for leader selection.
*/
+
public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
private ZKUtils zk;
private static XLog LOG = XLog.getLog(ZKLocksService.class);
- private static final String LOCKS_NODE = "/locks/";
+ public static final String LOCKS_NODE = "/locks";
private final AtomicLong lockCount = new AtomicLong();
+ private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
+ public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec
+ public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
+ public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
+ private ChildReaper reaper = null;
+
/**
* Initialize the zookeeper locks service
*
@@ -50,6 +68,9 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
super.init(services);
try {
zk = ZKUtils.register(this);
+ reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_INDEFINITELY, getExecutorService(),
+ services.getConf().getInt(REAPING_THRESHOLD, DEFAULT_REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
+ reaper.start();
}
catch (Exception ex) {
throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
@@ -62,6 +83,15 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
*/
@Override
public void destroy() {
+ if (reaper != null) {
+ try {
+ reaper.close();
+ }
+ catch (IOException e) {
+ LOG.error("Error closing childReaper", e);
+ }
+ }
+
if (zk != null) {
zk.unregister(this);
}
@@ -95,7 +125,7 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
*/
@Override
public LockToken getReadLock(String resource, long wait) throws InterruptedException {
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + resource);
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
InterProcessMutex readLock = lock.readLock();
return acquireLock(wait, readLock);
}
@@ -110,7 +140,7 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
*/
@Override
public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + resource);
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
InterProcessMutex writeLock = lock.writeLock();
return acquireLock(wait, writeLock);
}
@@ -157,4 +187,10 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
}
}
}
+
+ private static ScheduledExecutorService getExecutorService() {
+ return ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS, 2),
+ "ZKLocksChildReaper");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/12ef6147/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 4a58e9b..ebceaa7 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2166,4 +2166,22 @@
</description>
</property>
+ <property>
+ <name>oozie.service.ZKLocksService.locks.reaper.threshold</name>
+ <value>300</value>
+ <description>
+ The frequency at which the ChildReaper will run.
+ Duration should be in sec. Default is 5 min.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.ZKLocksService.locks.reaper.threads</name>
+ <value>2</value>
+ <description>
+ Number of fixed threads used by ChildReaper to
+ delete empty locks.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/oozie/blob/12ef6147/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index a773469..5ce8ecb 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -20,6 +20,7 @@ package org.apache.oozie.service;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.util.*;
import org.apache.oozie.test.ZKXTestCase;
+import org.apache.zookeeper.data.Stat;
public class TestZKLocksService extends ZKXTestCase {
private XLog log = XLog.getLog(getClass());
@@ -415,4 +416,24 @@ public class TestZKLocksService extends ZKXTestCase {
sleep(1000);
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
+
+ public void testLockReaper() throws Exception {
+ Services.get().getConf().set(ZKLocksService.REAPING_THRESHOLD, "1");
+ ZKLocksService zkls = new ZKLocksService();
+ try {
+ zkls.init(Services.get());
+ for (int i = 0; i < 10; ++i) {
+ LockToken l = zkls.getReadLock(String.valueOf(i), 1);
+ l.release();
+
+ }
+ sleep(2000);
+ Stat stat = getClient().checkExists().forPath(ZKLocksService.LOCKS_NODE);
+ assertEquals(stat.getNumChildren(), 0);
+ }
+ finally {
+ zkls.destroy();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/12ef6147/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7fb57e5..190bad7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
<openjpa.version>2.2.2</openjpa.version>
<xerces.version>2.10.0</xerces.version>
+ <curator.version>2.5.0</curator.version>
</properties>
<modules>
@@ -702,19 +703,19 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
- <version>2.4.0</version>
+ <version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
- <version>2.4.0</version>
+ <version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
- <version>2.4.0</version>
+ <version>${curator.version}</version>
</dependency>
<!-- examples -->
http://git-wip-us.apache.org/repos/asf/oozie/blob/12ef6147/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8650116..9966933 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -4,6 +4,7 @@ OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)
-- Oozie 4.1.0 release (4.1 - unreleased)
+OOZIE-1906 Service to periodically remove ZK lock (puru via rohini)
OOZIE-1812 Bulk API with bundle Id should relax regex check for Id (puru via rohini)
OOZIE-1915 Move system properties to conf properties (puru via rohini)
OOZIE-1934 coordinator action repeatedly picked up by cachePurgeWorker of PartitionDependencyManagerService (ryota)