You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:57 UTC

[16/26] samza git commit: SAMZA-1375: Implement Lock for Azure using Lease Blob

SAMZA-1375: Implement Lock for Azure using Lease Blob

Author: PawasChhokra <pa...@gmail.com>
Author: PawasChhokra <Jaimatadi1$>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #280 from PawasChhokra/AzureDistributedLock


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67f7214a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67f7214a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67f7214a

Branch: refs/heads/0.14.0
Commit: 67f7214aace6f9f98492b53f0a10d703aa8706c1
Parents: c3b447e
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Fri Aug 18 17:49:52 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri Aug 18 17:49:52 2017 -0700

----------------------------------------------------------------------
 .../AzureCoordinationServiceFactory.java        |  30 ++++++
 .../coordinator/AzureCoordinationUtils.java     |  57 +++++++++++
 .../coordinator/AzureJobCoordinatorFactory.java |  29 ++++++
 .../org/apache/samza/coordinator/AzureLock.java | 100 +++++++++++++++++++
 .../samza/coordinator/DistributedLock.java      |  39 ++++++++
 5 files changed, 255 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
new file mode 100644
index 0000000..8016fbe
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationServiceFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+public class AzureCoordinationServiceFactory implements CoordinationServiceFactory {
+
+  @Override
+  public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
+    return new AzureCoordinationUtils(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
new file mode 100644
index 0000000..f76dde3
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.AzureClient;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.util.BlobUtils;
+
+
+public class AzureCoordinationUtils implements CoordinationUtils {
+
+  private final AzureConfig azureConfig;
+  private final AzureClient client;
+
+  public AzureCoordinationUtils(Config config) {
+    azureConfig = new AzureConfig(config);
+    this.client = new AzureClient(azureConfig.getAzureConnect());
+  }
+
+  @Override
+  public void reset() {}
+
+
+  @Override
+  public LeaderElector getLeaderElector() throws UnsupportedOperationException {
+    return null;
+  }
+
+  @Override
+  public Latch getLatch(int size, String latchId) throws UnsupportedOperationException {
+    return null;
+  }
+
+  public DistributedLock getLock(String initLockName) {
+    BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
+        azureConfig.getAzureBlobName() + initLockName, azureConfig.getAzureBlobLength());
+    return new AzureLock(blob);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
new file mode 100644
index 0000000..8b3d357
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+
+public class AzureJobCoordinatorFactory implements JobCoordinatorFactory {
+  @Override
+  public JobCoordinator getJobCoordinator(Config config) {
+    return new AzureJobCoordinator(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
new file mode 100644
index 0000000..0ef1b83
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureException;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Azure.
+ */
+public class AzureLock implements DistributedLock {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
+  private static final int LEASE_TIME_IN_SEC = 60;
+  private AtomicBoolean hasLock;
+  private AtomicReference<String> leaseId;
+  private final LeaseBlobManager leaseBlobManager;
+
+  public AzureLock(BlobUtils blobUtils) {
+    this.hasLock = new AtomicBoolean(false);
+    leaseBlobManager = new LeaseBlobManager(blobUtils.getBlob());
+    this.leaseId = new AtomicReference<>(null);
+  }
+
+  /**
+   * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
+   * The lock is acquired when the blob is leased successfully.
+   * @param timeout Duration after which timeout occurs.
+   * @param unit Time Unit of the timeout defined above.
+   * @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
+   */
+  @Override
+  public boolean lock(long timeout, TimeUnit unit) {
+    //Start timer for timeout
+    long startTime = System.currentTimeMillis();
+    long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+    Random random = new Random();
+
+    while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+      try {
+        leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+      } catch (AzureException e) {
+        return false;
+      }
+      if (leaseId.get() != null) {
+        LOG.info("Acquired lock!");
+        hasLock.getAndSet(true);
+        return true;
+      } else {
+        try {
+          Thread.sleep(random.nextInt(1000));
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+        }
+        LOG.info("Trying to acquire lock again...");
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Unlocks, by releasing the lease on the blob.
+   */
+  @Override
+  public void unlock() {
+    boolean status = leaseBlobManager.releaseLease(leaseId.get());
+    if (status) {
+      LOG.info("Unlocked successfully.");
+      hasLock.getAndSet(false);
+      leaseId.getAndSet(null);
+    } else {
+      LOG.info("Unable to unlock.");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/67f7214a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
new file mode 100644
index 0000000..6972cd9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+
+
+public interface DistributedLock {
+
+  /**
+   * Tries to acquire the lock
+   * @param timeout Duration of lock acquiring timeout.
+   * @param unit Time Unit of the timeout defined above.
+   * @return true if lock is acquired successfully, false if it times out.
+   */
+  boolean lock(long timeout, TimeUnit unit);
+
+  /**
+   * Releases the lock
+   */
+  void unlock();
+}
\ No newline at end of file