You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/07 21:15:36 UTC
samza git commit: SAMZA-1376: Create a leasing utility class for
blobs in Azure
Repository: samza
Updated Branches:
refs/heads/master cf5efe761 -> f5c5cb223
SAMZA-1376: Create a leasing utility class for blobs in Azure
navina
PR 1: AzureClient + AzureConfig
**PR 2: LeaseBlobManager** (current PR)
Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pa...@gmail.com>
Reviewers: Navina Ramesh <na...@apache.org>, Shanthoosh Venkataraman <sv...@linkedin.com>
Closes #256 from PawasChhokra/LeaseUtils
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f5c5cb22
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f5c5cb22
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f5c5cb22
Branch: refs/heads/master
Commit: f5c5cb2233f6e141c34ea372a49a7b07f163a359
Parents: cf5efe7
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Mon Aug 7 14:15:23 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Aug 7 14:15:23 2017 -0700
----------------------------------------------------------------------
.gitignore | 2 +-
.../main/java/org/apache/samza/AzureClient.java | 16 +++-
.../main/java/org/apache/samza/AzureConfig.java | 10 +-
.../java/org/apache/samza/AzureException.java | 43 +++++++++
.../java/org/apache/samza/LeaseBlobManager.java | 99 ++++++++++++++++++++
5 files changed, 160 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index d4dcaa1..7cbffe7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,4 +28,4 @@ docs/learn/documentation/*/rest/javadocs
out/
*.patch
**.pyc
-samza-shell/src/main/visualizer/plan.json
+samza-shell/src/main/visualizer/plan.json
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index b5884cd..7c12055 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -38,19 +38,25 @@ public class AzureClient {
private final CloudTableClient tableClient;
private final CloudBlobClient blobClient;
+ /**
+ * Creates a reference to the Azure Storage account according to the connection string that the client passes.
+ * Also creates references to Azure Blob Storage and Azure Table Storage.
+ * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+ * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid.
+ */
AzureClient(String storageConnectionString) {
try {
account = CloudStorageAccount.parse(storageConnectionString);
blobClient = account.createCloudBlobClient();
tableClient = account.createCloudTableClient();
} catch (IllegalArgumentException | URISyntaxException e) {
- LOG.error("\nConnection string {} specifies an invalid URI.", storageConnectionString);
+ LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
LOG.error("Please confirm the connection string is in the Azure connection string format.");
- throw new SamzaException(e);
+ throw new AzureException(e);
} catch (InvalidKeyException e) {
- LOG.error("\nConnection string {} specifies an invalid key.", storageConnectionString);
+ LOG.error("Connection string {} specifies an invalid key.", storageConnectionString);
LOG.error("Please confirm the AccountName and AccountKey in the connection string are valid.");
- throw new SamzaException(e);
+ throw new AzureException(e);
}
}
@@ -61,4 +67,4 @@ public class AzureClient {
public CloudTableClient getTableClient() {
return tableClient;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
index b88d3c0..32b8082 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
@@ -25,16 +25,19 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.config.MapConfig;
+/**
+ * Config class for reading all user defined parameters for Azure driven coordination services.
+ */
public class AzureConfig extends MapConfig {
// Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
+ public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
private static String containerName;
private static String blobName;
private static String tableName;
- public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
public AzureConfig(Config config) {
super(config);
@@ -60,6 +63,7 @@ public class AzureConfig extends MapConfig {
public String getAzureBlobName() {
return blobName;
}
+
public long getAzureBlobLength() {
return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
}
@@ -67,6 +71,4 @@ public class AzureConfig extends MapConfig {
public String getAzureTableName() {
return tableName;
}
-
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/AzureException.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureException.java b/samza-azure/src/main/java/org/apache/samza/AzureException.java
new file mode 100644
index 0000000..12da984
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/AzureException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+/**
+ * Unchecked exception that Azure throws when something goes wrong .
+ */
+public class AzureException extends RuntimeException {
+
+ public AzureException() {
+ super();
+ }
+
+ public AzureException(String s, Throwable t) {
+ super(s, t);
+ }
+
+ public AzureException(String s) {
+ super(s);
+ }
+
+ public AzureException(Throwable t) {
+ super(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f5c5cb22/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
new file mode 100644
index 0000000..3d6b13b
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class for lease blob operations.
+ */
+public class LeaseBlobManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
+ private CloudPageBlob leaseBlob;
+
+ public LeaseBlobManager(CloudPageBlob leaseBlob) {
+ this.leaseBlob = leaseBlob;
+ }
+
+ /**
+ * Acquires a lease on a blob. The lease ID is NULL initially.
+ * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
+ * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
+ * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already.
+ * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+ */
+ public String acquireLease(int leaseTimeInSec, String leaseId) {
+ try {
+ String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
+ LOG.info("Acquired lease with lease id = " + id);
+ return id;
+ } catch (StorageException storageException) {
+ int httpStatusCode = storageException.getHttpStatusCode();
+ if (httpStatusCode == HttpStatus.CONFLICT_409) {
+ LOG.info("The blob you're trying to acquire is leased already.", storageException);
+ } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
+ LOG.error("The blob you're trying to lease does not exist.", storageException);
+ throw new AzureException(storageException);
+ } else {
+ LOG.error("Error acquiring lease!", storageException);
+ throw new AzureException(storageException);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Renews the lease on the blob.
+ * @param leaseId ID of the lease to be renewed.
+ * @return True if lease was renewed successfully, false otherwise.
+ */
+ public boolean renewLease(String leaseId) {
+ try {
+ leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ return true;
+ } catch (StorageException storageException) {
+ LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
+ return false;
+ }
+ }
+
+ /**
+ * Releases the lease on the blob.
+ * @param leaseId ID of the lease to be released.
+ * @return True if released successfully, false otherwise.
+ */
+ public boolean releaseLease(String leaseId) {
+ try {
+ leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+ return true;
+ } catch (StorageException storageException) {
+ LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
+ return false;
+ }
+ }
+
+}
\ No newline at end of file