You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/07 21:15:36 UTC

samza git commit: SAMZA-1376: Create a leasing utility class for blobs in Azure

Repository: samza
Updated Branches:
  refs/heads/master cf5efe761 -> f5c5cb223


SAMZA-1376: Create a leasing utility class for blobs in Azure

navina
PR 1: AzureClient + AzureConfig
**PR 2: LeaseBlobManager** (current PR)

Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pa...@gmail.com>

Reviewers: Navina Ramesh <na...@apache.org>, Shanthoosh Venkataraman <sv...@linkedin.com>

Closes #256 from PawasChhokra/LeaseUtils


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f5c5cb22
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f5c5cb22
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f5c5cb22

Branch: refs/heads/master
Commit: f5c5cb2233f6e141c34ea372a49a7b07f163a359
Parents: cf5efe7
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Mon Aug 7 14:15:23 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Aug 7 14:15:23 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |  2 +-
 .../main/java/org/apache/samza/AzureClient.java | 16 +++-
 .../main/java/org/apache/samza/AzureConfig.java | 10 +-
 .../java/org/apache/samza/AzureException.java   | 43 +++++++++
 .../java/org/apache/samza/LeaseBlobManager.java | 99 ++++++++++++++++++++
 5 files changed, 160 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index d4dcaa1..7cbffe7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,4 +28,4 @@ docs/learn/documentation/*/rest/javadocs
 out/
 *.patch
 **.pyc
-samza-shell/src/main/visualizer/plan.json
+samza-shell/src/main/visualizer/plan.json
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index b5884cd..7c12055 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -38,19 +38,25 @@ public class AzureClient {
   private final CloudTableClient tableClient;
   private final CloudBlobClient blobClient;
 
+  /**
+   * Creates a reference to the Azure Storage account according to the connection string that the client passes.
+   * Also creates references to Azure Blob Storage and Azure Table Storage.
+   * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+   * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid.
+   */
   AzureClient(String storageConnectionString) {
     try {
       account = CloudStorageAccount.parse(storageConnectionString);
       blobClient = account.createCloudBlobClient();
       tableClient = account.createCloudTableClient();
     } catch (IllegalArgumentException | URISyntaxException e) {
-      LOG.error("\nConnection string {} specifies an invalid URI.", storageConnectionString);
+      LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
       LOG.error("Please confirm the connection string is in the Azure connection string format.");
-      throw new SamzaException(e);
+      throw new AzureException(e);
     } catch (InvalidKeyException e) {
-      LOG.error("\nConnection string {} specifies an invalid key.", storageConnectionString);
+      LOG.error("Connection string {} specifies an invalid key.", storageConnectionString);
       LOG.error("Please confirm the AccountName and AccountKey in the connection string are valid.");
-      throw new SamzaException(e);
+      throw new AzureException(e);
     }
   }
 
@@ -61,4 +67,4 @@ public class AzureClient {
   public CloudTableClient getTableClient() {
     return tableClient;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
index b88d3c0..32b8082 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
@@ -25,16 +25,19 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
 
 
+/**
+ * Config class for reading all user defined parameters for Azure driven coordination services.
+ */
 public class AzureConfig extends MapConfig {
 
   // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
   public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
   public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
+  public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
 
   private static String containerName;
   private static String blobName;
   private static String tableName;
-  public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
 
   public AzureConfig(Config config) {
     super(config);
@@ -60,6 +63,7 @@ public class AzureConfig extends MapConfig {
   public String getAzureBlobName() {
     return blobName;
   }
+
   public long getAzureBlobLength() {
     return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
   }
@@ -67,6 +71,4 @@ public class AzureConfig extends MapConfig {
   public String getAzureTableName() {
     return tableName;
   }
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureException.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureException.java b/samza-azure/src/main/java/org/apache/samza/AzureException.java
new file mode 100644
index 0000000..12da984
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/AzureException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+/**
+ * Unchecked exception that Azure throws when something goes wrong .
+ */
+public class AzureException extends RuntimeException {
+
+  public AzureException() {
+    super();
+  }
+
+  public AzureException(String s, Throwable t) {
+    super(s, t);
+  }
+
+  public AzureException(String s) {
+    super(s);
+  }
+
+  public AzureException(Throwable t) {
+    super(t);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
new file mode 100644
index 0000000..3d6b13b
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class for lease blob operations.
+ */
+public class LeaseBlobManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
+  private CloudPageBlob leaseBlob;
+
+  public LeaseBlobManager(CloudPageBlob leaseBlob) {
+    this.leaseBlob = leaseBlob;
+  }
+
+  /**
+   * Acquires a lease on a blob. The lease ID is NULL initially.
+   * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
+   * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
+   * @return String that represents lease ID.  Null if acquireLease is unsuccessful because the blob is leased already.
+   * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+   */
+  public String acquireLease(int leaseTimeInSec, String leaseId) {
+    try {
+      String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
+      LOG.info("Acquired lease with lease id = " + id);
+      return id;
+    } catch (StorageException storageException) {
+      int httpStatusCode = storageException.getHttpStatusCode();
+      if (httpStatusCode == HttpStatus.CONFLICT_409) {
+        LOG.info("The blob you're trying to acquire is leased already.", storageException);
+      } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
+        LOG.error("The blob you're trying to lease does not exist.", storageException);
+        throw new AzureException(storageException);
+      } else {
+        LOG.error("Error acquiring lease!", storageException);
+        throw new AzureException(storageException);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Renews the lease on the blob.
+   * @param leaseId ID of the lease to be renewed.
+   * @return True if lease was renewed successfully, false otherwise.
+   */
+  public boolean renewLease(String leaseId) {
+    try {
+      leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
+      return false;
+    }
+  }
+
+  /**
+   * Releases the lease on the blob.
+   * @param leaseId ID of the lease to be released.
+   * @return True if released successfully, false otherwise.
+   */
+  public boolean releaseLease(String leaseId) {
+    try {
+      leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
+      return false;
+    }
+  }
+
+}
\ No newline at end of file