You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/17 19:50:53 UTC

[GitHub] [beam] pabloem commented on a change in pull request #12581: [BEAM-10378] Add Azure Blob Storage Filesystem

pabloem commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r471651523



##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {

Review comment:
       I see that this code is duplicated in other filesystems. Would you be willing to move this to something like FileSystemUtils.java to live right next to FileSystem.java and remove the duplicated code from other places? (this can be done as part of a follow-up PR, but can you file a JIRA ticket to track it?)

##########
File path: sdks/java/io/azure/testingFiles/in.txt
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review comment:
       you can move this to `src/test/resources`

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceId.java
##########
@@ -105,6 +129,17 @@ boolean isWildcard() {
     return GLOB_PREFIX.matcher(blob).matches();
   }
 
+  String getBlobNonWildcardPrefix() {
+    Matcher m = GLOB_PREFIX.matcher(getBlob());
+    checkArgument(
+        m.matches(), String.format("Glob expression: [%s] is not expandable.", getBlob()));
+    return m.group("PREFIX");
+  }
+
+  public boolean isContainer() {
+    return blob == null;

Review comment:
       is it possible to have `blob == null` and also `container == null` so that it's an account?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) {
+            LOG.debug("Matched object: {}", name);
+
+            BlobProperties properties = blobContainerClient.getBlobClient(name).getProperties();
+            AzfsResourceId rid =
+                AzfsResourceId.fromComponents(account, container, name)
+                    .withSize(properties.getBlobSize())
+                    .withLastModified(Date.from(properties.getLastModified().toInstant()));
+
+            results.add(toMetadata(rid, properties.getContentEncoding()));
+          }
+        });
+
+    return MatchResult.create(MatchResult.Status.OK, results);

Review comment:
       What happens if there are no matches whatsoever? I guess `results` would be empty and that's it?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");

Review comment:
       For line 133 and 134 Can you add something like "Internal error encountered in AzureBlobStoreFileSystem: Expect no more elements in ..." ?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) {
+            LOG.debug("Matched object: {}", name);

Review comment:
       it's not that important, but `name` here contains only the blob name, right? Can you include the container and account in the log?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/options/package-info.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Defines IO connectors for Microsoft Azure Blobstore. */
+@Experimental(Kind.FILESYSTEM)

Review comment:
       this comment is more related to the options classes, but you can add this annotation to AzureOptions and BlobstoreOptions, so that we will be able to move options around. As discussed in person, when in doubt about where to put an option, err on the side of caution, and put them in BlobstoreOptions.

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {

Review comment:
       Should this be `i-1`? or should it actually be `i+1`? I understand this code comes from elsewhere (thus it's a good idea to standardize and make it a utility) - can you add a unit test with a file pattern containing a backslash at the end to see what happens?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) {
+            LOG.debug("Matched object: {}", name);
+
+            BlobProperties properties = blobContainerClient.getBlobClient(name).getProperties();
+            AzfsResourceId rid =
+                AzfsResourceId.fromComponents(account, container, name)
+                    .withSize(properties.getBlobSize())
+                    .withLastModified(Date.from(properties.getLastModified().toInstant()));
+
+            results.add(toMetadata(rid, properties.getContentEncoding()));
+          }
+        });
+
+    return MatchResult.create(MatchResult.Status.OK, results);
+  }
+
+  private MatchResult.Metadata toMetadata(AzfsResourceId path, String contentEncoding) {
+
+    checkArgument(path.getSize().isPresent(), "path has size");
+    boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding);
+
+    return MatchResult.Metadata.builder()
+        .setIsReadSeekEfficient(isReadSeekEfficient)
+        .setResourceId(path)
+        .setSizeBytes(path.getSize().get())
+        .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L))
+        .build();
+  }
+
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link AzfsResourceId paths}.
+   *
+   * <p>The number of returned {@link MatchResult MatchResults} equals to the number of given {@link
+   * AzfsResourceId paths}. Each {@link MatchResult} contains one {@link MatchResult.Metadata}.
+   */
+  @VisibleForTesting
+  private Iterable<MatchResult> matchNonGlobPaths(List<AzfsResourceId> paths) {
+    ImmutableList.Builder<MatchResult> toReturn = ImmutableList.builder();
+    for (AzfsResourceId path : paths) {
+      toReturn.add(toMatchResult(path));
+    }
+    return toReturn.build();
+  }
+
+  private MatchResult toMatchResult(AzfsResourceId path) {
+    BlobClient blobClient =
+        client.get().getBlobContainerClient(path.getContainer()).getBlobClient(path.getBlob());
+    BlobProperties blobProperties;
+
+    try {
+      blobProperties = blobClient.getProperties();
+    } catch (BlobStorageException e) {
+      if (e.getStatusCode() == 404) {
+        return MatchResult.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException());
+      }
+      return MatchResult.create(MatchResult.Status.ERROR, new IOException(e));
+    }
+
+    return MatchResult.create(
+        MatchResult.Status.OK,
+        ImmutableList.of(
+            toMetadata(
+                path.withSize(blobProperties.getBlobSize())
+                    .withLastModified(Date.from(blobProperties.getLastModified().toInstant())),
+                blobProperties.getContentEncoding())));
   }
 
   @Override
   protected WritableByteChannel create(AzfsResourceId resourceId, CreateOptions createOptions)
       throws IOException {
-    // TODO
-    return null;
+    BlobContainerClient blobContainerClient =
+        client.get().getBlobContainerClient(resourceId.getContainer());
+    if (!blobContainerClient.exists()) {
+      throw new UnsupportedOperationException("create does not create containers.");
+    }
+
+    BlobClient blobClient = blobContainerClient.getBlobClient(resourceId.getBlob());
+    // The getBlobOutputStream method overwrites existing blobs,
+    // so throw an error in this case to prevent data loss
+    if (blobClient.exists()) {
+      throw new IOException("This filename is already in use.");
+    }
+
+    OutputStream outputStream;
+    try {
+      outputStream = blobClient.getBlockBlobClient().getBlobOutputStream();
+    } catch (BlobStorageException e) {
+      throw (IOException) e.getCause();
+    }
+    return newChannel(outputStream);
   }
 
   @Override
   protected ReadableByteChannel open(AzfsResourceId resourceId) throws IOException {
-    // TODO
-    return null;
+    BlobClient blobClient =
+        client
+            .get()
+            .getBlobContainerClient(resourceId.getContainer())
+            .getBlobClient(resourceId.getBlob());
+    if (!blobClient.exists()) {
+      throw new FileNotFoundException("The requested file doesn't exist.");
+    }
+    return new AzureReadableSeekableByteChannel(blobClient);
   }
 
   @Override
   protected void copy(List<AzfsResourceId> srcPaths, List<AzfsResourceId> destPaths)
       throws IOException {
-    // TODO
+    checkArgument(
+        srcPaths.size() == destPaths.size(),
+        "sizes of source paths and destination paths do not match");
+
+    Iterator<AzfsResourceId> sourcePathsIterator = srcPaths.iterator();
+    Iterator<AzfsResourceId> destinationPathsIterator = destPaths.iterator();
+    while (sourcePathsIterator.hasNext()) {
+      final AzfsResourceId sourcePath = sourcePathsIterator.next();
+      final AzfsResourceId destinationPath = destinationPathsIterator.next();
+      copy(sourcePath, destinationPath);
+    }
   }
 
   @VisibleForTesting
   void copy(AzfsResourceId sourcePath, AzfsResourceId destinationPath) throws IOException {
-    // TODO
+    checkArgument(
+        sourcePath.getBlob() != null && destinationPath.getBlob() != null,
+        "This method is intended to copy file-like resources, not directories.");
+
+    // get source blob client
+    BlobClient srcBlobClient =
+        client
+            .get()
+            .getBlobContainerClient(sourcePath.getContainer())
+            .getBlobClient(sourcePath.getBlob());
+    if (!srcBlobClient.exists()) {
+      throw new FileNotFoundException("The copy source does not exist.");
+    }
+
+    // get destination blob client
+    BlobContainerClient destBlobContainerClient =
+        client.get().getBlobContainerClient(destinationPath.getContainer());
+    if (!destBlobContainerClient.exists()) {
+      client.get().createBlobContainer(destinationPath.getContainer());

Review comment:
       Can you `LOG.info` the fact that a container is being created?

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) {
+            LOG.debug("Matched object: {}", name);
+
+            BlobProperties properties = blobContainerClient.getBlobClient(name).getProperties();
+            AzfsResourceId rid =
+                AzfsResourceId.fromComponents(account, container, name)
+                    .withSize(properties.getBlobSize())
+                    .withLastModified(Date.from(properties.getLastModified().toInstant()));
+
+            results.add(toMetadata(rid, properties.getContentEncoding()));
+          }
+        });
+
+    return MatchResult.create(MatchResult.Status.OK, results);
+  }
+
+  private MatchResult.Metadata toMetadata(AzfsResourceId path, String contentEncoding) {
+
+    checkArgument(path.getSize().isPresent(), "path has size");
+    boolean isReadSeekEfficient = !NON_READ_SEEK_EFFICIENT_ENCODINGS.contains(contentEncoding);
+
+    return MatchResult.Metadata.builder()
+        .setIsReadSeekEfficient(isReadSeekEfficient)
+        .setResourceId(path)
+        .setSizeBytes(path.getSize().get())
+        .setLastModifiedMillis(path.getLastModified().transform(Date::getTime).or(0L))
+        .build();
+  }
+
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link AzfsResourceId paths}.
+   *
+   * <p>The number of returned {@link MatchResult MatchResults} equals to the number of given {@link
+   * AzfsResourceId paths}. Each {@link MatchResult} contains one {@link MatchResult.Metadata}.
+   */
+  @VisibleForTesting
+  private Iterable<MatchResult> matchNonGlobPaths(List<AzfsResourceId> paths) {
+    ImmutableList.Builder<MatchResult> toReturn = ImmutableList.builder();
+    for (AzfsResourceId path : paths) {
+      toReturn.add(toMatchResult(path));
+    }
+    return toReturn.build();
+  }
+
+  private MatchResult toMatchResult(AzfsResourceId path) {
+    BlobClient blobClient =
+        client.get().getBlobContainerClient(path.getContainer()).getBlobClient(path.getBlob());
+    BlobProperties blobProperties;
+
+    try {
+      blobProperties = blobClient.getProperties();
+    } catch (BlobStorageException e) {
+      if (e.getStatusCode() == 404) {
+        return MatchResult.create(MatchResult.Status.NOT_FOUND, new FileNotFoundException());
+      }
+      return MatchResult.create(MatchResult.Status.ERROR, new IOException(e));
+    }
+
+    return MatchResult.create(
+        MatchResult.Status.OK,
+        ImmutableList.of(
+            toMetadata(
+                path.withSize(blobProperties.getBlobSize())
+                    .withLastModified(Date.from(blobProperties.getLastModified().toInstant())),
+                blobProperties.getContentEncoding())));
   }
 
   @Override
   protected WritableByteChannel create(AzfsResourceId resourceId, CreateOptions createOptions)
       throws IOException {
-    // TODO
-    return null;
+    BlobContainerClient blobContainerClient =
+        client.get().getBlobContainerClient(resourceId.getContainer());
+    if (!blobContainerClient.exists()) {
+      throw new UnsupportedOperationException("create does not create containers.");
+    }
+
+    BlobClient blobClient = blobContainerClient.getBlobClient(resourceId.getBlob());
+    // The getBlobOutputStream method overwrites existing blobs,
+    // so throw an error in this case to prevent data loss
+    if (blobClient.exists()) {
+      throw new IOException("This filename is already in use.");
+    }
+
+    OutputStream outputStream;
+    try {
+      outputStream = blobClient.getBlockBlobClient().getBlobOutputStream();
+    } catch (BlobStorageException e) {
+      throw (IOException) e.getCause();
+    }
+    return newChannel(outputStream);
   }
 
   @Override
   protected ReadableByteChannel open(AzfsResourceId resourceId) throws IOException {
-    // TODO
-    return null;
+    BlobClient blobClient =
+        client
+            .get()
+            .getBlobContainerClient(resourceId.getContainer())
+            .getBlobClient(resourceId.getBlob());
+    if (!blobClient.exists()) {
+      throw new FileNotFoundException("The requested file doesn't exist.");
+    }
+    return new AzureReadableSeekableByteChannel(blobClient);
   }
 
   @Override
   protected void copy(List<AzfsResourceId> srcPaths, List<AzfsResourceId> destPaths)
       throws IOException {
-    // TODO
+    checkArgument(
+        srcPaths.size() == destPaths.size(),
+        "sizes of source paths and destination paths do not match");
+
+    Iterator<AzfsResourceId> sourcePathsIterator = srcPaths.iterator();
+    Iterator<AzfsResourceId> destinationPathsIterator = destPaths.iterator();
+    while (sourcePathsIterator.hasNext()) {
+      final AzfsResourceId sourcePath = sourcePathsIterator.next();
+      final AzfsResourceId destinationPath = destinationPathsIterator.next();
+      copy(sourcePath, destinationPath);
+    }
   }
 
   @VisibleForTesting
   void copy(AzfsResourceId sourcePath, AzfsResourceId destinationPath) throws IOException {
-    // TODO
+    checkArgument(
+        sourcePath.getBlob() != null && destinationPath.getBlob() != null,
+        "This method is intended to copy file-like resources, not directories.");
+
+    // get source blob client
+    BlobClient srcBlobClient =
+        client
+            .get()
+            .getBlobContainerClient(sourcePath.getContainer())
+            .getBlobClient(sourcePath.getBlob());
+    if (!srcBlobClient.exists()) {
+      throw new FileNotFoundException("The copy source does not exist.");
+    }
+
+    // get destination blob client
+    BlobContainerClient destBlobContainerClient =
+        client.get().getBlobContainerClient(destinationPath.getContainer());
+    if (!destBlobContainerClient.exists()) {
+      client.get().createBlobContainer(destinationPath.getContainer());
+    }
+    BlobClient destBlobClient = destBlobContainerClient.getBlobClient(destinationPath.getBlob());
+
+    destBlobClient.copyFromUrl(srcBlobClient.getBlobUrl() + generateSasToken());
+  }
+
+  @VisibleForTesting
+  String generateSasToken() throws IOException {
+    SharedAccessAccountPolicy sharedAccessAccountPolicy = new SharedAccessAccountPolicy();
+    long date = new Date().getTime();
+    long expiryDate = new Date(date + 8640000).getTime();
+
+    sharedAccessAccountPolicy.setPermissionsFromString("racwdlup");
+    sharedAccessAccountPolicy.setSharedAccessStartTime(new Date(date));
+    sharedAccessAccountPolicy.setSharedAccessExpiryTime(new Date(expiryDate));
+    sharedAccessAccountPolicy.setResourceTypeFromString(
+        "co"); // container, object, add s for service
+    sharedAccessAccountPolicy.setServiceFromString("b"); // blob, add "fqt" for file, queue, table
+
+    String storageConnectionString = options.getAzureConnectionString();
+    try {
+      CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+      return "?" + storageAccount.generateSharedAccessSignature(sharedAccessAccountPolicy);
+    } catch (Exception e) {
+      throw (IOException) e.getCause();
+    }
   }
 
   @Override
   protected void rename(List<AzfsResourceId> srcResourceIds, List<AzfsResourceId> destResourceIds)
       throws IOException {
-    // TODO
+    copy(srcResourceIds, destResourceIds);
+    delete(srcResourceIds);
   }
 
+  // This method with delete a virtual folder or a blob
   @Override
   protected void delete(Collection<AzfsResourceId> resourceIds) throws IOException {
-    // TODO
+    for (AzfsResourceId resourceId : resourceIds) {
+      if (resourceId.getBlob() == null) {
+        throw new IOException("delete does not delete containers.");
+      }
+
+      BlobContainerClient container =
+          client.get().getBlobContainerClient(resourceId.getContainer());
+
+      // deleting a blob that is not a directory
+      if (!resourceId.isDirectory()) {
+        BlobClient blob = container.getBlobClient(resourceId.getBlob());
+        if (!blob.exists()) {
+          throw new FileNotFoundException("The resource to delete does not exist.");
+        }
+        blob.delete();
+      }
+

Review comment:
       remove this line

##########
File path: sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/options/AzureOptions.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.options;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.util.Configuration;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+public interface AzureOptions extends PipelineOptions {
+
+  // TODO: Add any other azure options that users should be able to configure
+  // TODO: Confirm that Azure options are in this file, Blobstore options in BlobstoreOptions
+
+  /** The Azure service endpoint used by the Azure client. */
+  @Description("Azure service endpoint used by the Azure client")
+  String getAzureServiceEndpoint();
+
+  void setAzureServiceEndpoint(String value);
+
+  /**
+   * The credential instance that should be used to authenticate against Azure services. The option
+   * value must contain a "@type" field and an Azure credentials provider class as the field value.
+   */
+  @Description(
+      "The credential instance that should be used to authenticate "
+          + "against Azure services. The option value must contain \"@type\" field "
+          + "and an Azure credentials provider class name as the field value.")
+  @Default.InstanceFactory(AzureUserCredentialsFactory.class)
+  TokenCredential getAzureCredentialsProvider();
+
+  void setAzureCredentialsProvider(TokenCredential value);
+
+  /** Attempts to load Azure credentials. */
+  class AzureUserCredentialsFactory implements DefaultValueFactory<TokenCredential> {
+
+    @Override
+    public TokenCredential create(PipelineOptions options) {
+      return new DefaultAzureCredentialBuilder().build();
+    }
+  }
+
+  /** The client configuration instance that should be used to configure Azure service clients. */
+  @Description(
+      "The client configuration instance that should be used to configure Azure service clients")
+  @Default.InstanceFactory(ConfigurationFactory.class)
+  Configuration getClientConfiguration();
+
+  void setClientConfiguration(Configuration configuration);
+
+  /** The client configuration instance that should be used to configure Azure service clients. */
+  @Description(
+      "The client configuration instance that should be used to configure Azure http client configuration parameters."
+          + "Mentioned parameters are the available parameters that can be set. Set only those that need custom changes.")
+  @Default.InstanceFactory(ConfigurationFactory.class)
+  Configuration getAzureHttpConfiguration();
+
+  void setAzureHttpConfiguration(Configuration configuration);
+
+  /** Default Azure client configuration. */
+  class ConfigurationFactory implements DefaultValueFactory<Configuration> {
+
+    @Override
+    public Configuration create(PipelineOptions options) {
+      return new Configuration();
+    }
+  }

Review comment:
       These configuration-related variables seem to be non-serializable. I think we need to implement the special Json serializer before moving forward with adding these changes. We can remove these changes from this PR (and rely purely on a `String`-type connectionString) and add them in a follow-up PR, or get the Json serializer in this PR. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org