You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/10/29 17:56:57 UTC

[GitHub] [hadoop] steveloughran commented on a change in pull request #1925: HADOOP-16948. Support single writer dirs.

steveloughran commented on a change in pull request #1925:
URL: https://github.com/apache/hadoop/pull/1925#discussion_r514450156



##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -243,6 +252,16 @@ public String getPrimaryGroup() {
 
   @Override
   public void close() throws IOException {
+    for (SelfRenewingLease lease : leaseRefs.keySet()) {

Review comment:
       This likely to take time? I'm worried about what happens if there's network problems and this gets invoked. Ideally this would be done in parallel, but abfs doesnt (yet) have a thread pool

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SelfRenewingLease.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.base.Preconditions;

Review comment:
       as well as the usual import grouping/ordering, we've gone to shaded guava on trunk

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
##########
@@ -685,10 +712,38 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic
           statistics,
           relativePath,
           offset,
-          populateAbfsOutputStreamContext(isAppendBlob));
+          leaseRefs,
+          populateAbfsOutputStreamContext(isAppendBlob, enableSingleWriter));
     }
   }
 
+  public String acquireLease(final Path path, final int duration) throws AzureBlobFileSystemException {

Review comment:
       go on, add some javadocs

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SelfRenewingLease.java
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ *
+ * See also {@link org.apache.hadoop.fs.azure.SelfRenewingLease}.
+ */
+public final class SelfRenewingLease {
+
+  private final AbfsClient client;
+  private final Path path;
+  private Thread renewer;
+  private volatile boolean leaseFreed;
+  private String leaseID = null;
+  private static final int LEASE_TIMEOUT = 60;  // Lease timeout in seconds
+
+  // Time to wait to renew lease in milliseconds
+  public static final int LEASE_RENEWAL_PERIOD = 40000;
+  public static final Logger LOG = LoggerFactory.getLogger(SelfRenewingLease.class);
+
+  // Used to allocate thread serial numbers in thread name
+  private static AtomicInteger threadNumber = new AtomicInteger(0);
+
+
+  // Time to wait to retry getting the lease in milliseconds
+  static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+  static final int LEASE_MAX_RETRIES = 5;
+
+  public static class LeaseException extends AzureBlobFileSystemException {
+    public LeaseException(Exception innerException) {
+      super(ERR_ACQUIRING_LEASE, innerException);
+    }
+  }
+
+  public SelfRenewingLease(AbfsClient client, Path path) throws AzureBlobFileSystemException {
+
+    this.leaseFreed = false;
+    this.client = client;
+    this.path = path;
+
+    // Try to get the lease a specified number of times, else throw an error
+    int numRetries = 0;
+    while (leaseID == null && numRetries < LEASE_MAX_RETRIES) {
+      numRetries++;
+      try {
+        LOG.debug("lease path: {}", path);
+        final AbfsRestOperation op =
+            client.acquireLease(getRelativePath(path),
+                LEASE_TIMEOUT);
+
+        leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
+      } catch (IOException e) {
+        if (numRetries < LEASE_MAX_RETRIES) {
+          LOG.info("Caught exception when trying to acquire lease on blob {}, retrying: {}", path,
+              e.getMessage());
+          LOG.debug("Exception acquiring lease", e);
+        } else {
+          throw new LeaseException(e);
+        }
+      }
+      if (leaseID == null) {
+        try {
+          Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);

Review comment:
       Prefer you use our normal `RetryPolicy` if possible




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org