You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/06/10 22:57:49 UTC
git commit: OOZIE-1715 Distributed ID sequence for HA (puru via
rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 659c45c4d -> ecedf6dc6
OOZIE-1715 Distributed ID sequence for HA (puru via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ecedf6dc
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ecedf6dc
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ecedf6dc
Branch: refs/heads/master
Commit: ecedf6dc6f10c60298fd33653afc79b1f8fbc21d
Parents: 659c45c
Author: Robert Kanter <rk...@cloudera.com>
Authored: Tue Jun 10 13:57:14 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Tue Jun 10 13:57:14 2014 -0700
----------------------------------------------------------------------
.../org/apache/oozie/service/UUIDService.java | 38 +++-
.../org/apache/oozie/service/ZKUUIDService.java | 174 ++++++++++++++++
.../java/org/apache/oozie/util/ZKUtils.java | 11 +-
core/src/main/resources/oozie-default.xml | 8 +
.../apache/oozie/service/TestZKUUIDService.java | 203 +++++++++++++++++++
docs/src/site/twiki/AG_Install.twiki | 20 +-
release-log.txt | 1 +
7 files changed, 443 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/UUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/UUIDService.java b/core/src/main/java/org/apache/oozie/service/UUIDService.java
index 836815d..7489a53 100644
--- a/core/src/main/java/org/apache/oozie/service/UUIDService.java
+++ b/core/src/main/java/org/apache/oozie/service/UUIDService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -56,7 +56,7 @@ public class UUIDService implements Service {
String genType = services.getConf().get(CONF_GENERATOR, "counter").trim();
if (genType.equals("counter")) {
counter = new AtomicLong();
- startTime = new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
+ startTime = getStartTime();
}
else {
if (!genType.equals("random")) {
@@ -76,6 +76,14 @@ public class UUIDService implements Service {
}
/**
+ * Get Server start time
+ * @return
+ */
+ public String getStartTime() {
+ return new SimpleDateFormat("yyMMddHHmmssSSS").format(new Date());
+ }
+
+ /**
* Return the public interface for UUID service.
*
* @return {@link UUIDService}.
@@ -103,8 +111,20 @@ public class UUIDService implements Service {
public String generateId(ApplicationType type) {
StringBuilder sb = new StringBuilder();
+ sb.append(getSequence());
+ sb.append('-').append(systemId);
+ sb.append('-').append(type.getType());
+ // limitation due to current DB schema for action ID length (100)
+ if (sb.length() > 40) {
+ throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb));
+ }
+ return sb.toString();
+ }
+
+ public String getSequence() {
+ StringBuilder sb = new StringBuilder();
if (counter != null) {
- sb.append(longPadding(counter.getAndIncrement())).append('-').append(startTime);
+ sb.append(longPadding(getID())).append('-').append(startTime);
}
else {
sb.append(UUID.randomUUID().toString());
@@ -112,15 +132,13 @@ public class UUIDService implements Service {
sb.setLength(37 - systemId.length());
}
}
- sb.append('-').append(systemId);
- sb.append('-').append(type.getType());
- // limitation due to current DB schema for action ID length (100)
- if (sb.length() > 40) {
- throw new RuntimeException(XLog.format("ID exceeds limit of 40 characters, [{0}]", sb));
- }
return sb.toString();
}
+ public long getID() {
+ return counter.getAndIncrement();
+ }
+
/**
* Create a child ID.
* <p/>
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
new file mode 100644
index 0000000..33d782b
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/ZKUUIDService.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.lock.LockToken;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.ZKUtils;
+
+/**
+ * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available.
+ * The sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}).
+ * The sequence will be reset to 0, once max is reached.
+ */
+
+public class ZKUUIDService extends UUIDService {
+
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService.";
+
+ public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max";
+
+ public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
+ public static final long DEFULT_SEQUENCE_MAX = 99999999990l;
+ public static final long RESET_VALUE = 0l;
+ public static final int RETRY_COUNT = 3;
+
+ private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
+
+ private ZKUtils zk;
+ Long maxSequence;
+
+ DistributedAtomicLong atomicIdGenerator;
+
+ @Override
+ public void init(Services services) throws ServiceException {
+
+ super.init(services);
+ try {
+ zk = ZKUtils.register(this);
+ maxSequence = services.getConf().getLong(CONF_SEQUENCE_MAX, DEFULT_SEQUENCE_MAX);
+ atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, ZKUtils.getRetryPloicy());
+ }
+ catch (Exception ex) {
+ throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
+ }
+
+ }
+
+ /**
+ * Gets the unique id.
+ *
+ * @return the id
+ * @throws Exception the exception
+ */
+ public long getID() {
+ return getZKId(0);
+ }
+
+ @SuppressWarnings("finally")
+ private long getZKId(int retryCount) {
+ if (atomicIdGenerator == null) {
+ throw new RuntimeException("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH);
+ }
+ AtomicValue<Long> value = null;
+ try {
+ value = atomicIdGenerator.increment();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Exception incrementing UID for session ", e);
+ }
+ finally {
+ if (value != null && value.succeeded()) {
+ if (value.preValue() >= maxSequence) {
+ if (retryCount >= RETRY_COUNT) {
+ throw new RuntimeException("Can't reset sequence. Tried " + retryCount + " times");
+ }
+ resetSequence();
+ return getZKId(retryCount + 1);
+ }
+ return value.preValue();
+ }
+ else {
+ throw new RuntimeException("Exception incrementing UID for session ");
+ }
+ }
+
+ }
+
+ /**
+ * Once sequence is reached limit, reset to 0.
+ */
+ private void resetSequence() {
+ synchronized (ZKUUIDService.class) {
+ try {
+ // Double check if sequence is already reset.
+ AtomicValue<Long> value = atomicIdGenerator.get();
+ if (value.succeeded()) {
+ if (value.postValue() < maxSequence) {
+ return;
+ }
+ }
+ else {
+ throw new RuntimeException("Can't reset sequence");
+ }
+ // Acquire ZK lock, so that other host doesn't reset sequence.
+ LockToken lock = Services.get().get(MemoryLocksService.class)
+ .getWriteLock(ZKUUIDService.class.getName(), lockTimeout);
+ try {
+ if (lock == null) {
+ LOG.info("Lock is held by other system, returning");
+ return;
+ }
+ else {
+ value = atomicIdGenerator.get();
+ if (value.succeeded()) {
+ if (value.postValue() < maxSequence) {
+ return;
+ }
+ }
+ else {
+ throw new RuntimeException("Can't reset sequence");
+ }
+ atomicIdGenerator.forceSet(RESET_VALUE);
+ }
+ }
+ finally {
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Can't reset sequence", e);
+ }
+ }
+ }
+
+ /**
+ * Get start time.
+ */
+ public String getStartTime(){
+ return new SimpleDateFormat("yyMMddHHmmss").format(new Date());
+ }
+
+ @Override
+ public void destroy() {
+ if (zk != null) {
+ zk.unregister(this);
+ }
+ zk = null;
+ super.destroy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/java/org/apache/oozie/util/ZKUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index 56055b8..885b656 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -153,7 +153,7 @@ public class ZKUtils {
private void createClient() throws Exception {
// Connect to the ZooKeeper server
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie");
ACLProvider aclProvider;
@@ -375,4 +375,13 @@ public class ZKUtils {
return saslACL;
}
}
+
+ /**
+ * Returns retry policy
+ *
+ * @return RetryPolicy
+ */
+ public static RetryPolicy getRetryPloicy() {
+ return new ExponentialBackoffRetry(1000, 3);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 63a91fa..c9d9591 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2113,5 +2113,13 @@
</description>
</property>
+ <property>
+ <name>oozie.service.ZKUUIDService.jobid.sequence.max</name>
+ <value>99999999990</value>
+ <description>
+ Maximum job id sequence for Oozie in HA mode. Current job id sequence is stored in ZK. Once the sequence reaches
+ maximum limit, server will reset job id sequence to 0.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
new file mode 100644
index 0000000..c41383c
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestZKUUIDService.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.oozie.service.UUIDService.ApplicationType;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.ZKUtils;
+
+public class TestZKUUIDService extends ZKXTestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testRegisterUnregister() throws Exception {
+ assertEquals(0, ZKUtils.getUsers().size());
+ ZKUUIDService zkUUUIDService = new ZKUUIDService();
+ try {
+ zkUUUIDService.init(Services.get());
+ assertEquals(1, ZKUtils.getUsers().size());
+ assertEquals(zkUUUIDService, ZKUtils.getUsers().iterator().next());
+ zkUUUIDService.destroy();
+ assertEquals(0, ZKUtils.getUsers().size());
+ }
+ finally {
+ zkUUUIDService.destroy();
+ }
+ }
+
+ public void testIDGeneration() throws Exception {
+ ZKUUIDService uuid = new ZKUUIDService();
+ try {
+
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ uuid.init(Services.get());
+ String id = uuid.generateId(ApplicationType.WORKFLOW);
+ assertTrue(id.startsWith("0000000-"));
+ for (int i = 0; i < 1000; i++) {
+ id = uuid.generateId(ApplicationType.WORKFLOW);
+ }
+ assertTrue(id.startsWith("0001000-"));
+ }
+ finally {
+ uuid.destroy();
+ }
+ }
+
+ public void testMultipleIDGeneration() throws Exception {
+ ZKUUIDService uuid1 = new ZKUUIDService();
+ ZKUUIDService uuid2 = new ZKUUIDService();
+
+ try {
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ uuid1.init(Services.get());
+ uuid2.init(Services.get());
+ for (int i = 0; i < 1000; i += 2) {
+ String id1 = uuid1.generateId(ApplicationType.WORKFLOW);
+ String id2 = uuid2.generateId(ApplicationType.WORKFLOW);
+ assertEquals(Integer.parseInt(id1.substring(0, 7)), i);
+ assertEquals(Integer.parseInt(id2.substring(0, 7)), i + 1);
+ }
+ }
+ finally {
+ uuid1.destroy();
+ uuid2.destroy();
+ }
+
+ }
+
+ public void testMultipleIDGeneration_withMultiThread() throws Exception {
+ final List<Boolean> result = new ArrayList<Boolean>(10000);
+ final ZKUUIDService uuid1 = new ZKUUIDService();
+ final ZKUUIDService uuid2 = new ZKUUIDService();
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ uuid1.init(Services.get());
+ uuid2.init(Services.get());
+
+ try {
+ Thread t1 = new Thread() {
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ String id = uuid1.generateId(ApplicationType.WORKFLOW);
+ result.add(Integer.parseInt(id.substring(0, 7)), true);
+ }
+ }
+ };
+ Thread t2 = new Thread() {
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ String id = uuid2.generateId(ApplicationType.WORKFLOW);
+ result.add(Integer.parseInt(id.substring(0, 7)), true);
+ }
+ }
+ };
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ for (int i = 0; i < 10000; i++) {
+ assertTrue(result.get(i));
+ }
+ }
+ finally {
+ uuid1.destroy();
+ uuid2.destroy();
+ }
+ }
+
+ public void testResetSequence() throws Exception {
+ Services service = Services.get();
+ service.setService(ZKLocksService.class);
+ ZKUUIDService uuid = new ZKUUIDService();
+ try {
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "900");
+ uuid.init(service);
+ String id = uuid.generateId(ApplicationType.WORKFLOW);
+ assertTrue(id.startsWith("0000000-"));
+ for (int i = 0; i < 1000; i++) {
+ id = uuid.generateId(ApplicationType.WORKFLOW);
+ }
+ assertTrue(id.startsWith("0000100-"));
+ }
+ finally {
+ uuid.destroy();
+ }
+ }
+
+ public void testResetSequence_withMultiThread() throws Exception {
+ Services service = Services.get();
+ service.setService(ZKLocksService.class);
+
+ final List<Integer> result = new ArrayList<Integer>(5000);
+ final ZKUUIDService uuid1 = new ZKUUIDService();
+ final ZKUUIDService uuid2 = new ZKUUIDService();
+ setSystemProperty(UUIDService.CONF_GENERATOR, "counter");
+ Services.get().getConf().set(ZKUUIDService.CONF_SEQUENCE_MAX, "5000");
+
+ uuid1.init(service);
+ uuid2.init(service);
+
+ for (int i = 0; i < 5000; i++) {
+ result.add(i, i);
+ }
+
+ try {
+ Thread t1 = new Thread() {
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ String id = uuid1.generateId(ApplicationType.WORKFLOW);
+ int index = Integer.parseInt(id.substring(0, 7));
+ result.add(index, result.get(index) + 1);
+ }
+ }
+ };
+ Thread t2 = new Thread() {
+ public void run() {
+ for (int i = 0; i < 5000; i++) {
+ String id = uuid2.generateId(ApplicationType.WORKFLOW);
+ int index = Integer.parseInt(id.substring(0, 7));
+ result.add(index, result.get(index) + 1);
+ }
+ }
+ };
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ for (int i = 0; i < 5000; i++) {
+ assertEquals(result.get(i), Integer.valueOf(2));
+ }
+ }
+ finally {
+ uuid1.destroy();
+ uuid2.destroy();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki
index 89f7407..ce84e54 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -751,7 +751,8 @@ make Oozie use the ZooKeeper versions of these services instead of the default i
<value>
org.apache.oozie.service.ZKLocksService,
org.apache.oozie.service.ZKXLogStreamingService,
- org.apache.oozie.service.ZKJobsConcurrencyService
+ org.apache.oozie.service.ZKJobsConcurrencyService,
+ org.apache.oozie.service.ZKUUIDService
</value>
</property>
</verbatim>
@@ -831,6 +832,23 @@ kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
</verbatim>
+---++++ JobId sequence
+Oozie in HA mode, uses ZK to generate job id sequence. Job Ids are of following format.
+<Id sequence>-<yyMMddHHmmss(server start time)>-<system_id>-<W/C/B>
+
+Where, <systemId> is configured as =oozie.system.id= (default is "oozie-" + "user.name")
+W/C/B is suffix to job id indicating that generated job is a type of workflow or coordinator or bundle.
+
+Maximum allowed character for job id sequence is 40. "Id sequence" is stored in ZK and reset to 0 once maximum job id sequence is
+reached. Maximum job id sequence is configured as =oozie.service.ZKUUIDService.jobid.sequence.max=, default value is 99999999990.
+
+<verbatim>
+<property>
+ <name>oozie.service.ZKUUIDService.jobid.sequence.max</name>
+ <value>99999999990</value>
+</property>
+</verbatim>
+
---++ Starting and Stopping Oozie
Use the standard Tomcat commands to start and stop Oozie.
http://git-wip-us.apache.org/repos/asf/oozie/blob/ecedf6dc/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8071b46..74b5dfd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1715 Distributed ID sequence for HA (puru via rkanter)
OOZIE-1870 Workflow action doen't resolve retry-max and retry-interval (puru via rohini)
OOZIE-1686 Typo in DG_CommandLineTool (anbu78 via ryota)
OOZIE-1804 Improve documentation for Coordinator Specification (lars_francke via rkanter)