You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/09 01:26:54 UTC
samza git commit: SAMZA-1374: Implement Leader Election using Lease
Blob in Azure
Repository: samza
Updated Branches:
refs/heads/master bf4c7619f -> 403042394
SAMZA-1374: Implement Leader Election using Lease Blob in Azure
PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
PR 4: TableUtils + ProcessorEntity
**PR 5: AzureLeaderElector** (current PR)
Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pa...@gmail.com>
Reviewers: Navina Ramesh <na...@apache.org>, Shanthoosh Venkataraman <sv...@linkedin.com>
Closes #259 from PawasChhokra/LeaderElection
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40304239
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40304239
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40304239
Branch: refs/heads/master
Commit: 403042394945deba1affa7269c7ead3129326cbd
Parents: bf4c761
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Tue Aug 8 18:26:50 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Aug 8 18:26:50 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/samza/AzureClient.java | 10 ++
.../org/apache/samza/AzureLeaderElector.java | 111 +++++++++++++++++++
2 files changed, 121 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index 7c12055..2248d12 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -20,6 +20,9 @@
package org.apache.samza;
import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.RetryLinearRetry;
+import com.microsoft.azure.storage.RetryPolicy;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.table.CloudTableClient;
import java.net.URISyntaxException;
@@ -47,7 +50,14 @@ public class AzureClient {
AzureClient(String storageConnectionString) {
try {
account = CloudStorageAccount.parse(storageConnectionString);
+
blobClient = account.createCloudBlobClient();
+ // Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals.
+ BlobRequestOptions options = new BlobRequestOptions();
+ RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
+ options.setRetryPolicyFactory(retryPolicy);
+ blobClient.setDefaultRequestOptions(options);
+
tableClient = account.createCloudTableClient();
} catch (IllegalArgumentException | URISyntaxException e) {
LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
http://git-wip-us.apache.org/repos/asf/samza/blob/40304239/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
new file mode 100644
index 0000000..efa8ea1
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.coordinator.LeaderElector;
+import org.apache.samza.coordinator.LeaderElectorListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to facilitate leader election in Azure.
+ * The processor that acquires the lease on the blob becomes the leader.
+ * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
+ * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
+ * Read operations from the blob are not dependent on the lease ID.
+ */
+public class AzureLeaderElector implements LeaderElector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
+ private static final int LEASE_TIME_IN_SEC = 60;
+ private final LeaseBlobManager leaseBlobManager;
+ private LeaderElectorListener leaderElectorListener = null;
+ private final AtomicReference<String> leaseId;
+ private final AtomicBoolean isLeader;
+
+ public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
+ this.isLeader = new AtomicBoolean(false);
+ this.leaseBlobManager = leaseBlobManager;
+ this.leaseId = new AtomicReference<>(null);
+ }
+
+ @Override
+ public void setLeaderElectorListener(LeaderElectorListener listener) {
+ this.leaderElectorListener = listener;
+ }
+
+ /**
+ * Tries to become the leader by acquiring a lease on the blob.
+ * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+ * Invokes the listener on becoming the leader.
+ */
+ @Override
+ public void tryBecomeLeader() {
+ try {
+ leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+ if (leaseId.get() != null) {
+ LOG.info("Became leader with lease ID {}.", leaseId.get());
+ isLeader.set(true);
+ if (leaderElectorListener != null) {
+ leaderElectorListener.onBecomingLeader();
+ }
+ }
+ } catch (AzureException e) {
+ LOG.error("Error while trying to acquire lease.", e);
+ }
+ }
+
+ /**
+ * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
+ * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+ */
+ @Override
+ public void resignLeadership() {
+ if (isLeader.get()) {
+ leaseBlobManager.releaseLease(leaseId.get());
+ isLeader.set(false);
+ LOG.info("Resigning leadership with lease ID {}", leaseId.get());
+ leaseId.getAndSet(null);
+ } else {
+ LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
+ }
+ }
+
+ /**
+ * Checks whether it's a leader
+ * @return true if it is the leader, false otherwise
+ */
+ @Override
+ public boolean amILeader() {
+ return isLeader.get();
+ }
+
+ public AtomicReference<String> getLeaseId() {
+ return leaseId;
+ }
+
+ public LeaseBlobManager getLeaseBlobManager() {
+ return this.leaseBlobManager;
+ }
+
+}