You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/06/10 22:57:49 UTC

git commit: OOZIE-1715 Distributed ID sequence for HA (puru via rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 659c45c4d -> ecedf6dc6


OOZIE-1715 Distributed ID sequence for HA (puru via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ecedf6dc
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ecedf6dc
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ecedf6dc

Branch: refs/heads/master
Commit: ecedf6dc6f10c60298fd33653afc79b1f8fbc21d
Parents: 659c45c
Author: Robert Kanter <rk...@cloudera.com>
Authored: Tue Jun 10 13:57:14 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Tue Jun 10 13:57:14 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/service/UUIDService.java   |  38 +++-
 .../org/apache/oozie/service/ZKUUIDService.java | 174 ++++++++++++++++
 .../java/org/apache/oozie/util/ZKUtils.java     |  11 +-
 core/src/main/resources/oozie-default.xml       |   8 +
 .../apache/oozie/service/TestZKUUIDService.java | 203 +++++++++++++++++++
 docs/src/site/twiki/AG_Install.twiki            |  20 +-
 release-log.txt                                 |   1 +
 7 files changed, 443 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/UUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java b/core/src/main/java/org/apache/oozie/service/UUIDService.java
index 836815d..7489a53 100644
--- a/core/src/main/java/org/apache/oozie/service/UUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,7 +56,7 @@ public class UUIDService implements Service {
         String genType = services.getConf().get(CONF_GENERATOR, "counter").trim();
         if (genType.equals("counter")) {
             counter = new AtomicLong();
-            startTime = new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
+            startTime = getStartTime();
         }
         else {
             if (!genType.equals("random")) {
@@ -76,6 +76,14 @@ public class UUIDService implements Service {
     }
 
     /**
+     * Get Server start time
+     * @return
+     */
+    public String getStartTime() {
+        return new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
+    }
+
+    /**
      * Return the public interface for UUID service.
      *
      * @return {@link UUIDService}.
@@ -103,8 +111,20 @@ public class UUIDService implements Service {
     public String generateId(ApplicationType type) {
         StringBuilder sb = new StringBuilder();
 
+        sb.append(getSequence());
+        sb.append('-').append(systemId);
+        sb.append('-').append(type.getType());
+        // limitation due to current DB schema for action ID length (100)
+        if (sb.length() > 40) {
+            throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb));
+        }
+        return sb.toString();
+    }
+
+    public String getSequence() {
+        StringBuilder sb = new StringBuilder();
         if (counter != null) {
-            sb.append(longPadding(counter.getAndIncrement())).append('-').append(startTime);
+            sb.append(longPadding(getID())).append('-').append(startTime);
         }
         else {
             sb.append(UUID.randomUUID().toString());
@@ -112,15 +132,13 @@ public class UUIDService implements Service {
                 sb.setLength(37 - systemId.length());
             }
         }
-        sb.append('-').append(systemId);
-        sb.append('-').append(type.getType());
-        // limitation due to current DB schema for action ID length (100)
-        if (sb.length() > 40) {
-            throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb));
-        }
         return sb.toString();
     }
 
+    public long getID() {
+        return counter.getAndIncrement();
+    }
+
     /**
      * Create a child ID.
      * <p/>

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
new file mode 100644
index 0000000..33d782b
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.ZKUtils;
+
+/**
+ * Service that provides distributed job id sequence via ZooKeeper.  Requires that a ZooKeeper ensemble is available.
+ * The sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}).
+ * The sequence will be reset to 0, once max is reached.
+ */
+
+public class ZKUUIDService extends UUIDService {
+
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService.";
+
+    public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max";
+
+    public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
+    public static final long DEFULT_SEQUENCE_MAX = 99999999990l;
+    public static final long RESET_VALUE = 0l;
+    public static final int RETRY_COUNT = 3;
+
+    private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
+
+    private ZKUtils zk;
+    Long maxSequence;
+
+    DistributedAtomicLong atomicIdGenerator;
+
+    @Override
+    public void init(Services services) throws ServiceException {
+
+        super.init(services);
+        try {
+            zk = ZKUtils.register(this);
+            maxSequence = services.getConf().getLong(CONF_SEQUENCE_MAX, DEFULT_SEQUENCE_MAX);
+            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy());
+        }
+        catch (Exception ex) {
+            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
+        }
+
+    }
+
+    /**
+     * Gets the unique id.
+     *
+     * @return the id
+     * @throws Exception the exception
+     */
+    public long getID() {
+        return getZKId(0);
+    }
+
+    @SuppressWarnings("finally")
+    private long getZKId(int retryCount) {
+        if (atomicIdGenerator == null) {
+            throw new RuntimeException("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH);
+        }
+        AtomicValue<Long> value = null;
+        try {
+            value = atomicIdGenerator.increment();
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Exception incrementing UID for session ", e);
+        }
+        finally {
+            if (value != null && value.succeeded()) {
+                if (value.preValue() >= maxSequence) {
+                    if (retryCount >= RETRY_COUNT) {
+                        throw new RuntimeException("Can't reset sequence. Tried " + retryCount + " times");
+                    }
+                    resetSequence();
+                    return getZKId(retryCount + 1);
+                }
+                return value.preValue();
+            }
+            else {
+                throw new RuntimeException("Exception incrementing UID for session ");
+            }
+        }
+
+    }
+
+    /**
+     * Once sequence is reached limit, reset to 0.
+     */
+    private void resetSequence() {
+        synchronized (ZKUUIDService.class) {
+            try {
+                // Double check if sequence is already reset.
+                AtomicValue<Long> value = atomicIdGenerator.get();
+                if (value.succeeded()) {
+                    if (value.postValue() < maxSequence) {
+                        return;
+                    }
+                }
+                else {
+                    throw new RuntimeException("Can't reset sequence");
+                }
+                // Acquire ZK lock, so that other host doesn't reset sequence.
+                LockToken lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(ZKUUIDService.class.getName(), lockTimeout);
+                try {
+                    if (lock == null) {
+                        LOG.info("Lock is held by other system, returning");
+                        return;
+                    }
+                    else {
+                        value = atomicIdGenerator.get();
+                        if (value.succeeded()) {
+                            if (value.postValue() < maxSequence) {
+                                return;
+                            }
+                        }
+                        else {
+                            throw new RuntimeException("Can't reset sequence");
+                        }
+                        atomicIdGenerator.forceSet(RESET_VALUE);
+                    }
+                }
+                finally {
+                    if (lock != null) {
+                        lock.release();
+                    }
+                }
+            }
+            catch (Exception e) {
+                throw new RuntimeException("Can't reset sequence", e);
+            }
+        }
+    }
+
+    /**
+     * Get start time.
+     */
+    public String getStartTime(){
+        return new SimpleDateFormat("yyMMddHHmmss").format(new Date());
+    }
+
+    @Override
+    public void destroy() {
+        if (zk != null) {
+            zk.unregister(this);
+        }
+        zk = null;
+        super.destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index 56055b8..885b656 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -153,7 +153,7 @@ public class ZKUtils {
 
     private void createClient() throws Exception {
         // Connect to the ZooKeeper server
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
         String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
         String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie");
         ACLProvider aclProvider;
@@ -375,4 +375,13 @@ public class ZKUtils {
             return saslACL;
         }
     }
+
+    /**
+     * Returns retry policy
+     *
+     * @return RetryPolicy
+     */
+    public static RetryPolicy getRetryPloicy() {
+        return new ExponentialBackoffRetry(1000, 3);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 63a91fa..c9d9591 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2113,5 +2113,13 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.ZKUUIDService.jobid.sequence.max</name>
+        <value>99999999990</value>
+        <description>
+        Maximum job id sequence for Oozie in HA mode. Current job id sequence is stored in ZK. Once the sequence reaches
+        maximum limit, server will reset job id sequence to 0.
+        </description>
+    </property>
 
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
new file mode 100644
index 0000000..c41383c
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.oozie.service.UUIDService.ApplicationType;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.ZKUtils;
+
+public class TestZKUUIDService extends ZKXTestCase {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void testRegisterUnregister() throws Exception {
+        assertEquals(0, ZKUtils.getUsers().size());
+        ZKUUIDService zkUUUIDService = new ZKUUIDService();
+        try {
+            zkUUUIDService.init(Services.get());
+            assertEquals(1, ZKUtils.getUsers().size());
+            assertEquals(zkUUUIDService, ZKUtils.getUsers().iterator().next());
+            zkUUUIDService.destroy();
+            assertEquals(0, ZKUtils.getUsers().size());
+        }
+        finally {
+            zkUUUIDService.destroy();
+        }
+    }
+
+    public void testIDGeneration() throws Exception {
+        ZKUUIDService uuid = new ZKUUIDService();
+        try {
+
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            uuid.init(Services.get());
+            String id = uuid.generateId(ApplicationType.WORKFLOW);
+            assertTrue(id.startsWith("0000000-"));
+            for (int i = 0; i < 1000; i++) {
+                id = uuid.generateId(ApplicationType.WORKFLOW);
+            }
+            assertTrue(id.startsWith("0001000-"));
+        }
+        finally {
+            uuid.destroy();
+        }
+    }
+
+    public void testMultipleIDGeneration() throws Exception {
+        ZKUUIDService uuid1 = new ZKUUIDService();
+        ZKUUIDService uuid2 = new ZKUUIDService();
+
+        try {
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            uuid1.init(Services.get());
+            uuid2.init(Services.get());
+            for (int i = 0; i < 1000; i += 2) {
+                String id1 = uuid1.generateId(ApplicationType.WORKFLOW);
+                String id2 = uuid2.generateId(ApplicationType.WORKFLOW);
+                assertEquals(Integer.parseInt(id1.substring(0, 7)), i);
+                assertEquals(Integer.parseInt(id2.substring(0, 7)), i + 1);
+            }
+        }
+        finally {
+            uuid1.destroy();
+            uuid2.destroy();
+        }
+
+    }
+
+    public void testMultipleIDGeneration_withMultiThread() throws Exception {
+        final List<Boolean> result = new ArrayList<Boolean>(10000);
+        final ZKUUIDService uuid1 = new ZKUUIDService();
+        final ZKUUIDService uuid2 = new ZKUUIDService();
+        setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+        uuid1.init(Services.get());
+        uuid2.init(Services.get());
+
+        try {
+            Thread t1 = new Thread() {
+                public void run() {
+                    for (int i = 0; i < 5000; i++) {
+                        String id = uuid1.generateId(ApplicationType.WORKFLOW);
+                        result.add(Integer.parseInt(id.substring(0, 7)), true);
+                    }
+                }
+            };
+            Thread t2 = new Thread() {
+                public void run() {
+                    for (int i = 0; i < 5000; i++) {
+                        String id = uuid2.generateId(ApplicationType.WORKFLOW);
+                        result.add(Integer.parseInt(id.substring(0, 7)), true);
+                    }
+                }
+            };
+            t1.start();
+            t2.start();
+            t1.join();
+            t2.join();
+            for (int i = 0; i < 10000; i++) {
+                assertTrue(result.get(i));
+            }
+        }
+        finally {
+            uuid1.destroy();
+            uuid2.destroy();
+        }
+    }
+
+    public void testResetSequence() throws Exception {
+        Services service = Services.get();
+        service.setService(ZKLocksService.class);
+        ZKUUIDService uuid = new ZKUUIDService();
+        try {
+            setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+            Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "900");
+            uuid.init(service);
+            String id = uuid.generateId(ApplicationType.WORKFLOW);
+            assertTrue(id.startsWith("0000000-"));
+            for (int i = 0; i < 1000; i++) {
+                id = uuid.generateId(ApplicationType.WORKFLOW);
+            }
+            assertTrue(id.startsWith("0000100-"));
+        }
+        finally {
+            uuid.destroy();
+        }
+    }
+
+    public void testResetSequence_withMultiThread() throws Exception {
+        Services service = Services.get();
+        service.setService(ZKLocksService.class);
+
+        final List<Integer> result = new ArrayList<Integer>(5000);
+        final ZKUUIDService uuid1 = new ZKUUIDService();
+        final ZKUUIDService uuid2 = new ZKUUIDService();
+        setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+        Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "5000");
+
+        uuid1.init(service);
+        uuid2.init(service);
+
+        for (int i = 0; i < 5000; i++) {
+            result.add(i, i);
+        }
+
+        try {
+            Thread t1 = new Thread() {
+                public void run() {
+                    for (int i = 0; i < 5000; i++) {
+                        String id = uuid1.generateId(ApplicationType.WORKFLOW);
+                        int index = Integer.parseInt(id.substring(0, 7));
+                        result.add(index, result.get(index) + 1);
+                    }
+                }
+            };
+            Thread t2 = new Thread() {
+                public void run() {
+                    for (int i = 0; i < 5000; i++) {
+                        String id = uuid2.generateId(ApplicationType.WORKFLOW);
+                        int index = Integer.parseInt(id.substring(0, 7));
+                        result.add(index, result.get(index) + 1);
+                    }
+                }
+            };
+            t1.start();
+            t2.start();
+            t1.join();
+            t2.join();
+            for (int i = 0; i < 5000; i++) {
+                assertEquals(result.get(i), Integer.valueOf(2));
+            }
+        }
+        finally {
+            uuid1.destroy();
+            uuid2.destroy();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki
index 89f7407..ce84e54 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -751,7 +751,8 @@ make Oozie use the ZooKeeper versions of these services instead of the default i
     <value>
         org.apache.oozie.service.ZKLocksService,
         org.apache.oozie.service.ZKXLogStreamingService,
-        org.apache.oozie.service.ZKJobsConcurrencyService
+        org.apache.oozie.service.ZKJobsConcurrencyService,
+        org.apache.oozie.service.ZKUUIDService
     </value>
 </property>
 </verbatim>
@@ -831,6 +832,23 @@ kerberos.removeHostFromPrincipal=true
 kerberos.removeRealmFromPrincipal=true
 </verbatim>
 
+---++++ JobId sequence
+Oozie in HA mode, uses ZK to generate job id sequence. Job Ids are of following format.
+<Id sequence>-<yyMMddHHmmss(server start time)>-<system_id>-<W/C/B>
+
+Where, <systemId> is configured as =oozie.system.id= (default is "oozie-" + "user.name")
+W/C/B is suffix to job id indicating that generated job is a type of workflow or coordinator or bundle.
+
+Maximum allowed character for job id sequence is 40. "Id sequence" is stored in ZK and reset to 0 once maximum job id sequence is
+reached. Maximum job id sequence is configured as =oozie.service.ZKUUIDService.jobid.sequence.max=, default value is 99999999990.
+
+<verbatim>
+<property>
+    <name>oozie.service.ZKUUIDService.jobid.sequence.max</name>
+    <value>99999999990</value>
+</property>
+</verbatim>
+
 ---++ Starting and Stopping Oozie
 
 Use the standard Tomcat commands to start and stop Oozie.

http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8071b46..74b5dfd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1715 Distributed ID sequence for HA (puru via rkanter)
 OOZIE-1870 Workflow action doen't resolve retry-max and retry-interval (puru via rohini)
 OOZIE-1686 Typo in DG_CommandLineTool (anbu78 via ryota)
 OOZIE-1804 Improve documentation for Coordinator Specification (lars_francke via rkanter)