You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:57 UTC
[16/26] samza git commit: SAMZA-1375: Implement Lock for Azure using
Lease Blob
SAMZA-1375: Implement Lock for Azure using Lease Blob
Author: PawasChhokra <pa...@gmail.com>
Author: PawasChhokra <Jaimatadi1$>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #280 from PawasChhokra/AzureDistributedLock
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67f7214a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67f7214a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67f7214a
Branch: refs/heads/0.14.0
Commit: 67f7214aace6f9f98492b53f0a10d703aa8706c1
Parents: c3b447e
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Fri Aug 18 17:49:52 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri Aug 18 17:49:52 2017 -0700
----------------------------------------------------------------------
.../AzureCoordinationServiceFactory.java | 30 ++++++
.../coordinator/AzureCoordinationUtils.java | 57 +++++++++++
.../coordinator/AzureJobCoordinatorFactory.java | 29 ++++++
.../org/apache/samza/coordinator/AzureLock.java | 100 +++++++++++++++++++
.../samza/coordinator/DistributedLock.java | 39 ++++++++
5 files changed, 255 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
new file mode 100644
index 0000000..8016fbe
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+public class AzureCoordinationServiceFactory implements CoordinationServiceFactory {
+
+ @Override
+ public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+ return new AzureCoordinationUtils(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
new file mode 100644
index 0000000..f76dde3
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.AzureClient;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.util.BlobUtils;
+
+
+public class AzureCoordinationUtils implements CoordinationUtils {
+
+ private final AzureConfig azureConfig;
+ private final AzureClient client;
+
+ public AzureCoordinationUtils(Config config) {
+ azureConfig = new AzureConfig(config);
+ this.client = new AzureClient(azureConfig.getAzureConnect());
+ }
+
+ @Override
+ public void reset() {}
+
+
+ @Override
+ public LeaderElector getLeaderElector() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public Latch getLatch(int size, String latchId) throws UnsupportedOperationException {
+ return null;
+ }
+
+ public DistributedLock getLock(String initLockName) {
+ BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
+ azureConfig.getAzureBlobName() + initLockName, azureConfig.getAzureBlobLength());
+ return new AzureLock(blob);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
new file mode 100644
index 0000000..8b3d357
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+public class AzureJobCoordinatorFactory implements JobCoordinatorFactory {
+ @Override
+ public JobCoordinator getJobCoordinator(Config config) {
+ return new AzureJobCoordinator(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
new file mode 100644
index 0000000..0ef1b83
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureException;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Azure.
+ */
+public class AzureLock implements DistributedLock {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
+ private static final int LEASE_TIME_IN_SEC = 60;
+ private AtomicBoolean hasLock;
+ private AtomicReference<String> leaseId;
+ private final LeaseBlobManager leaseBlobManager;
+
+ public AzureLock(BlobUtils blobUtils) {
+ this.hasLock = new AtomicBoolean(false);
+ leaseBlobManager = new LeaseBlobManager(blobUtils.getBlob());
+ this.leaseId = new AtomicReference<>(null);
+ }
+
+ /**
+ * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
+ * The lock is acquired when the blob is leased successfully.
+ * @param timeout Duration after which timeout occurs.
+ * @param unit Time Unit of the timeout defined above.
+ * @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
+ */
+ @Override
+ public boolean lock(long timeout, TimeUnit unit) {
+ //Start timer for timeout
+ long startTime = System.currentTimeMillis();
+ long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ Random random = new Random();
+
+ while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+ try {
+ leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+ } catch (AzureException e) {
+ return false;
+ }
+ if (leaseId.get() != null) {
+ LOG.info("Acquired lock!");
+ hasLock.getAndSet(true);
+ return true;
+ } else {
+ try {
+ Thread.sleep(random.nextInt(1000));
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ LOG.info("Trying to acquire lock again...");
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Unlocks, by releasing the lease on the blob.
+ */
+ @Override
+ public void unlock() {
+ boolean status = leaseBlobManager.releaseLease(leaseId.get());
+ if (status) {
+ LOG.info("Unlocked successfully.");
+ hasLock.getAndSet(false);
+ leaseId.getAndSet(null);
+ } else {
+ LOG.info("Unable to unlock.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
new file mode 100644
index 0000000..6972cd9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+
+
+public interface DistributedLock {
+
+ /**
+ * Tries to acquire the lock
+ * @param timeout Duration of lock acquiring timeout.
+ * @param unit Time Unit of the timeout defined above.
+ * @return true if lock is acquired successfully, false if it times out.
+ */
+ boolean lock(long timeout, TimeUnit unit);
+
+ /**
+ * Releases the lock
+ */
+ void unlock();
+}
\ No newline at end of file