You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/31 09:09:03 UTC

[GitHub] [pulsar] hangc0276 commented on a change in pull request #14930: Offloader: add API to scan objects on Tiered Storage

hangc0276 commented on a change in pull request #14930:
URL: https://github.com/apache/pulsar/pull/14930#discussion_r839356726



##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -621,4 +634,75 @@ public void close() {
             }
         }
     }
+
+    @Override
+    public void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map<String, String> offloadDriverMetadata) throws ManagedLedgerException {
+        BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
+        String endpoint = bsKey.getEndpoint();
+        String readBucket = bsKey.getBucket();
+        log.info("Scanning bucket {}, bsKey {}, location {} endpoint{} ", readBucket, bsKey,
+                config.getBlobStoreLocation(), endpoint);
+        BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
+        int batchSize = 100;
+        String bucketName = config.getBucket();
+        String marker = null;
+        do {
+            marker = scanContainer(consumer, readBlobstore, bucketName, marker, batchSize);
+        } while (marker != null);
+
+    }
+
+    private String scanContainer(OffloadedLedgerMetadataConsumer consumer, BlobStore readBlobstore,
+                                 String bucketName,
+                                 String lastMarker,
+                                 int batchSize) throws ManagedLedgerException {
+        ListContainerOptions options = new ListContainerOptions()
+                .maxResults(batchSize)
+                .withDetails();
+        if (lastMarker != null) {
+            options.afterMarker(lastMarker);
+        }
+        PageSet<? extends StorageMetadata> pages = readBlobstore.list(bucketName, options);
+        for (StorageMetadata md : pages) {
+            log.info("Found {} ",md);
+            String name = md.getName();
+            Long size = md.getSize();
+            Date lastModified = md.getLastModified();
+            StorageType type = md.getType();
+            if (type != StorageType.BLOB) {
+                continue;
+            }
+            URI uri = md.getUri();
+            Map<String, String> userMetadata = md.getUserMetadata();

Review comment:
       Yes, the ledger metadata stored in index file. https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java#L172-L174
   
   `md.getUserMetadata()` can only get the following properties.
   https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java#L1259-L1262

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -2795,6 +2802,111 @@ protected void internalSetNamespaceResourceGroup(String rgName) {
         internalSetPolicies("resource_group_name", rgName);
     }
 
+    protected Map<String, Object> internalScanOffloadedLedgers() throws Exception {
+        log.info("internalScanOffloadedLedgers {}", namespaceName);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        LedgerOffloader managedLedgerOffloader = pulsar()
+                .getManagedLedgerOffloader(namespaceName, (OffloadPoliciesImpl) policies.offload_policies);
+
+        String localClusterName = pulsar().getConfiguration().getClusterName();
+        Map<String, Object> topLevelResult = new HashMap<>();
+        List<Map<String, Object>> objects = new ArrayList<>();
+        topLevelResult.put("objects", objects);
+        AtomicInteger totalCount = new AtomicInteger();
+        AtomicInteger totalErrors = new AtomicInteger();
+        AtomicInteger totalUnknown = new AtomicInteger();
+        managedLedgerOffloader.scanLedgers((md -> {
+            log.info("Found ledger {}", md);
+            Map<String, Object> objectInfo = new HashMap<>();
+            objectInfo.put("ledger", md.getLedgerId());
+            objectInfo.put("name", md.getName());
+            objectInfo.put("uri", md.getUri());
+            objectInfo.put("uuid", md.getUuid());
+            objectInfo.put("size", md.getSize());
+            objectInfo.put("lastModified", md.getLastModified());
+            objectInfo.put("userMetadata", md.getUserMetadata());
+
+            String status = "UNKNOWN";
+
+            if (md.getUserMetadata() != null) {
+                // non case sensistive
+                TreeMap<String, String> userMetadata = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+                userMetadata.putAll(md.getUserMetadata());
+                String clusterName = userMetadata.get(LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME);
+                if (localClusterName.equals(clusterName)) {
+                    String managedLedgerName = userMetadata.get("managedledgername");
+                    if (managedLedgerName != null) {
+                        objectInfo.put("managedLedgerName", managedLedgerName);
+                        try {
+                            status = checkLedgerShouldBeOnTieredStorage(md.getLedgerId(), md.getUuid(),
+                                    managedLedgerName, objectInfo, pulsar().getManagedLedgerFactory());
+                        } catch (InterruptedException err) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(err);
+                        } catch (ManagedLedgerException err) {
+                            log.error("Error while checking managed ledger {}", managedLedgerName);
+                            throw new RuntimeException(err);
+                        }
+                    }
+                }
+            }
+            totalCount.incrementAndGet();
+            objectInfo.put("status", status);
+            switch (status) {
+                case "OK":
+                    break;
+                case "UNKNOWN":
+                    totalUnknown.incrementAndGet();
+                    break;
+                default:
+                    totalErrors.incrementAndGet();
+                    break;
+            }
+
+            objects.add(objectInfo);
+            return true;
+        }), managedLedgerOffloader.getOffloadDriverMetadata());
+        topLevelResult.put("errors", totalErrors.intValue());
+        topLevelResult.put("total", totalCount.intValue());
+        topLevelResult.put("unknownObjects", totalUnknown.intValue());
+        log.info("internalScanOffloadedLedgers {} scan finished");
+
+        return topLevelResult;

Review comment:
       The `topLevelResult` holds too much infos, and return to the client by REST is not a good solution. The number of items in `topLevelResult` depends on how many topics in this namespace.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -1965,5 +1966,22 @@ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
         internalSetNamespaceResourceGroup(null);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/scanOffloadedLedgers")
+    @ApiOperation(value = "Trigger the scan of offloaded Ledgers on the LedgerOffloader for the given namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace doesn't exist") })
+    public Map<String, Object> scanOffloadedLedgers(@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        try {
+            return internalScanOffloadedLedgers();

Review comment:
       This operation is time consuming. The client use rest api to trigger scan and waiting for the response. However it may timeout by the client side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org