You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/04/19 13:17:25 UTC

oozie git commit: OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko)

Repository: oozie
Updated Branches:
  refs/heads/master fbe5e491e -> ee359d7be


OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ee359d7b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ee359d7b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ee359d7b

Branch: refs/heads/master
Commit: ee359d7be1deb457b59ee12e26fac206d0198182
Parents: fbe5e49
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Wed Apr 19 15:17:14 2017 +0200
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Wed Apr 19 15:17:14 2017 +0200

----------------------------------------------------------------------
 .../oozie/service/MemoryLocksService.java       |  9 +++-
 .../apache/oozie/service/ZKLocksService.java    | 53 +++++++++++++-------
 .../main/java/org/apache/oozie/util/XLog.java   | 12 +++++
 release-log.txt                                 |  1 +
 4 files changed, 55 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
index 2ab2abc..4fa4d3e 100644
--- a/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/MemoryLocksService.java
@@ -24,12 +24,15 @@ import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.lock.MemoryLocks;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.oozie.util.XLog;
 
 /**
  * Service that provides in-memory locks.  Assumes no other Oozie servers are using the database.
  */
 public class MemoryLocksService implements Service, Instrumentable {
 
+    private static final XLog LOG = XLog.getLog(MemoryLocksService.class);
+
     public static enum Type {
         READ, WRITE
     }
@@ -87,7 +90,8 @@ public class MemoryLocksService implements Service, Instrumentable {
      * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
      * @throws InterruptedException thrown if the thread was interrupted while waiting.
      */
-    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
+    public LockToken getReadLock(final String resource, final long wait) throws InterruptedException {
+        LOG.trace("Acquiring in-memory read lock. [resource={0};wait={1}]", resource, wait);
         return locks.getLock(resource, Type.READ, wait);
     }
 
@@ -99,7 +103,8 @@ public class MemoryLocksService implements Service, Instrumentable {
      * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
      * @throws InterruptedException thrown if the thread was interrupted while waiting.
      */
-    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
+    public LockToken getWriteLock(final String resource, final long wait) throws InterruptedException {
+        LOG.trace("Acquiring in-memory write lock. [resource={0};wait={1}]", resource, wait);
         return locks.getLock(resource, Type.WRITE, wait);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 83790cf..2c71c00 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -25,7 +25,6 @@ import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
@@ -35,7 +34,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.curator.framework.recipes.locks.ChildReaper;
 import org.apache.curator.framework.recipes.locks.Reaper;
-import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.ThreadUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -49,16 +47,15 @@ import com.google.common.collect.MapMaker;
 public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
 
     private ZKUtils zk;
-    private static XLog LOG = XLog.getLog(ZKLocksService.class);
     public static final String LOCKS_NODE = "/locks";
 
-    private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
-
+    private static final XLog LOG = XLog.getLog(ZKLocksService.class);
+    private final ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
+    private ChildReaper reaper = null;
 
     private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
-    public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
-    public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
-    private ChildReaper reaper = null;
+    static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
+    static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
 
     /**
      * Initialize the zookeeper locks service
@@ -141,30 +138,50 @@ public class ZKLocksService extends MemoryLocksService implements Service, Instr
         return acquireLock(resource, Type.WRITE, wait);
     }
 
-    private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException {
-        InterProcessReadWriteLock lockEntry = zkLocks.get(resource);
-        if (lockEntry == null) {
-            InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
-            lockEntry = zkLocks.putIfAbsent(resource, newLock);
-            if (lockEntry == null) {
-                lockEntry = newLock;
-            }
+    private LockToken acquireLock(final String resource, final Type type, final long wait) throws InterruptedException {
+        LOG.debug("Acquiring ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
+
+        InterProcessReadWriteLock lockEntry;
+        final String zkPath = LOCKS_NODE + "/" + resource;
+        LOG.debug("Checking existing Curator lock or creating new one. [zkPath={}]", zkPath);
+
+        // Creating a Curator InterProcessReadWriteLock is lightweight - only calling acquire() costs real ZooKeeper calls
+        final InterProcessReadWriteLock newLockEntry = new InterProcessReadWriteLock(zk.getClient(), zkPath);
+        final InterProcessReadWriteLock existingLockEntry = zkLocks.putIfAbsent(resource, newLockEntry);
+        if (existingLockEntry == null) {
+            lockEntry = newLockEntry;
+            LOG.debug("No existing Curator lock present, new one created successfully. [zkPath={}]", zkPath);
         }
-        InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
+        else {
+            // We can't destoy newLockEntry and we don't have to - it's taken care of by Curator and JVM GC
+            lockEntry = existingLockEntry;
+            LOG.debug("Reusing existing Curator lock. [zkPath={}]", zkPath);
+        }
+
         ZKLockToken token = null;
         try {
+            LOG.debug("Calling Curator to acquire ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
+            final InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
             if (wait == -1) {
                 lock.acquire();
                 token = new ZKLockToken(lockEntry, type);
+                LOG.debug("ZooKeeper lock acquired successfully. [resource={};type={}]", resource, type);
             }
             else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
                 token = new ZKLockToken(lockEntry, type);
+                LOG.debug("ZooKeeper lock acquired successfully waiting. [resource={};type={};wait={}]", resource, type, wait);
+            }
+            else {
+                LOG.warn("Could not acquire ZooKeeper lock, timed out. [resource={};type={};wait={}]", resource, type, wait);
             }
         }
-        catch (Exception ex) {
+        catch (final Exception ex) {
             //Not throwing exception. Should return null, so that command can be requeued
+            LOG.warn("Could not acquire lock due to a ZooKeeper error. " +
+                    "[ex={};resource={};type={};wait={}]", ex, resource, type, wait);
             LOG.error("Error while acquiring lock", ex);
         }
+
         return token;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/core/src/main/java/org/apache/oozie/util/XLog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLog.java b/core/src/main/java/org/apache/oozie/util/XLog.java
index cc9abd8..9ecac4d 100644
--- a/core/src/main/java/org/apache/oozie/util/XLog.java
+++ b/core/src/main/java/org/apache/oozie/util/XLog.java
@@ -672,12 +672,24 @@ public class XLog implements Log {
     public static String format(String msgTemplate, Object... params) {
         ParamChecker.notEmpty(msgTemplate, "msgTemplate");
         msgTemplate = msgTemplate.replace("{E}", System.getProperty("line.separator"));
+        msgTemplate = replaceEmptyPositions(msgTemplate);
         if (params != null && params.length > 0) {
             msgTemplate = MessageFormat.format(msgTemplate, params);
         }
         return msgTemplate;
     }
 
+    private static String replaceEmptyPositions(String msgTemplate) {
+        int pos = 0;
+
+        while (msgTemplate.contains("{}")) {
+            msgTemplate = msgTemplate.replace("{}", String.format("{%d}", pos));
+            pos++;
+        }
+
+        return msgTemplate;
+    }
+
     /**
      * Utility method that extracts the <code>Throwable</code>, if present, from the parameters.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/ee359d7b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 11ae948..23aa9ae 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2843 Enhance logging inside ZKLocksService and MemoryLocksService (andras.piros via pbacsko)
 OOZIE-2818 Can't overwrite oozie.action.max.output.data on a per-workflow basis (asasvari via pbacsko)
 OOZIE-2827 More directly view of the coordinator\u2019s history from perspective of workflow action. (Alonzo Zhou via pbacsko)
 OOZIE-2864 Maven artifacts for package com.codahale.metrics have inconsistent groupId (andras.piros via pbacsko)