You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/03/24 21:45:39 UTC

[tika] branch main updated: TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 2945393  TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.
2945393 is described below

commit 294539352773d69b675e9048523607ba55792a87
Author: tallison <ta...@apache.org>
AuthorDate: Thu Mar 24 17:45:23 2022 -0400

    TIKA-3707 -- add fetcher and emitter for Azure blob storage -- add pipes iterator and clean up some other stuff.
---
 tika-parent/pom.xml                                |   7 +
 .../tika-emitters/tika-emitter-az-blob/pom.xml     |   3 +-
 .../tika-fetchers/tika-fetcher-az-blob/pom.xml     |   3 +-
 tika-pipes/tika-pipes-iterators/pom.xml            |   1 +
 .../pom.xml                                        |  11 +-
 .../pipesiterator/azblob/AZBlobPipesIterator.java  | 155 +++++++++++++++++++++
 .../azblob/TestAZBlobPipesIterator.java            |  99 +++++++++++++
 .../src/test/resources/log4j2.xml                  |  32 +++++
 .../tika-pipes-iterator-gcs/pom.xml                |   2 +-
 .../src/test/resources/log4j.properties            |  22 ---
 .../src/test/resources/log4j2.xml                  |  32 +++++
 11 files changed, 334 insertions(+), 33 deletions(-)

diff --git a/tika-parent/pom.xml b/tika-parent/pom.xml
index e9f7579..04148f1 100644
--- a/tika-parent/pom.xml
+++ b/tika-parent/pom.xml
@@ -398,6 +398,13 @@
         <version>0.9.1.2</version>
       </dependency>
       <dependency>
+        <groupId>com.azure</groupId>
+        <artifactId>azure-storage-blob</artifactId>
+        <!-- 12.15.0 introduces reams of convergence errors with
+             all sorts of netty components -->
+        <version>12.14.4</version>
+      </dependency>
+      <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-annotations</artifactId>
         <version>${jackson.version}</version>
diff --git a/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
index 57a83f7..1857fe6 100644
--- a/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-az-blob/pom.xml
@@ -46,7 +46,6 @@
     <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-storage-blob</artifactId>
-      <version>12.14.4</version>
     </dependency>
   </dependencies>
 
@@ -58,7 +57,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              <Automatic-Module-Name>org.apache.tika.pipes.emitter.gcs</Automatic-Module-Name>
+              <Automatic-Module-Name>org.apache.tika.pipes.emitter.azblob</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
index 44eddcf..4fe59c8 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/pom.xml
@@ -38,7 +38,6 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>12.14.4</version>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
@@ -63,7 +62,7 @@
                 <configuration>
                     <archive>
                         <manifestEntries>
-                            <Automatic-Module-Name>org.apache.tika.pipes.fetcher.gcs</Automatic-Module-Name>
+                            <Automatic-Module-Name>org.apache.tika.pipes.fetcher.azblob</Automatic-Module-Name>
                         </manifestEntries>
                     </archive>
                 </configuration>
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml b/tika-pipes/tika-pipes-iterators/pom.xml
index 9f7b41e..de1ed74 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -39,6 +39,7 @@
     <module>tika-pipes-iterator-s3</module>
     <module>tika-pipes-iterator-solr</module>
     <module>tika-pipes-iterator-gcs</module>
+    <module>tika-pipes-iterator-az-blob</module>
   </modules>
 
   <scm>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
similarity index 93%
copy from tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
copy to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
index 50c8445..cc9fb00 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/pom.xml
@@ -26,9 +26,9 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-pipes-iterator-gcs</artifactId>
+  <artifactId>tika-pipes-iterator-az-blob</artifactId>
 
-  <name>Apache Tika Pipes Iterator - Google Cloud Storage</name>
+  <name>Apache Tika Pipes Iterator - Azure Blob Storage</name>
   <url>https://tika.apache.org/</url>
 
   <dependencies>
@@ -39,9 +39,8 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.google.cloud</groupId>
-      <artifactId>google-cloud-storage</artifactId>
-      <version>${google.cloud.version}</version>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-blob</artifactId>
     </dependency>
   </dependencies>
   <build>
@@ -52,7 +51,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
+              <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.azblob</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
new file mode 100644
index 0000000..b207800
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.pipesiterator.azblob;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+import org.apache.tika.utils.StringUtils;
+
+public class AZBlobPipesIterator extends PipesIterator implements Initializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AZBlobPipesIterator.class);
+
+    private BlobServiceClient blobServiceClient;
+    private BlobContainerClient blobContainerClient;
+    private String prefix = "";
+    private String container = "";
+    private String sasToken;
+    private String endpoint;
+    private long timeoutMillis = 360000;
+
+    @Field
+    public void setSasToken(String sasToken) {
+        this.sasToken = sasToken;
+    }
+
+    @Field
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Field
+    public void setContainer(String container) {
+        this.container = container;
+    }
+
+    @Field
+    public void setPrefix(String prefix) {
+        //strip final "/" if it exists
+        if (prefix.endsWith("/")) {
+            this.prefix = prefix.substring(0, prefix.length() - 1);
+        } else {
+            this.prefix = prefix;
+        }
+    }
+
+    @Override
+    protected void enqueue() throws InterruptedException, IOException, TimeoutException {
+        String fetcherName = getFetcherName();
+        String emitterName = getEmitterName();
+        long start = System.currentTimeMillis();
+        int count = 0;
+        HandlerConfig handlerConfig = getHandlerConfig();
+
+        PagedIterable<BlobItem> blobs = null;
+        if (StringUtils.isBlank(prefix)) {
+            ListBlobsOptions options = new ListBlobsOptions()
+                    .setDetails(
+                            new BlobListDetails()
+                                    .setRetrieveDeletedBlobs(false)
+                                    .setRetrieveMetadata(false)
+                                    .setRetrieveSnapshots(false));
+            blobs = blobContainerClient.listBlobs(options,
+                    Duration.of(timeoutMillis, ChronoUnit.MILLIS));
+        } else {
+            ListBlobsOptions options = new ListBlobsOptions()
+                    .setPrefix(prefix)
+                    .setDetails(
+                            new BlobListDetails()
+                                    .setRetrieveDeletedBlobs(false)
+                                    .setRetrieveMetadata(false)
+                                    .setRetrieveSnapshots(false));
+            blobs = blobContainerClient.listBlobs(options,
+                    Duration.of(timeoutMillis, ChronoUnit.MILLIS));
+        }
+
+        for (BlobItem blob : blobs) {
+            //tried blob.isPrefix() and got NPE ... user error?
+            if (blob == null || blob.getProperties() == null ||
+                    blob.getProperties().getContentLength() == 0) {
+                continue;
+            }
+            long elapsed = System.currentTimeMillis() - start;
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("adding ({}) {} in {} ms", count, blob.getName(), elapsed);
+            }
+            //TODO -- extract metadata about content length etc from properties
+            tryToAdd(new FetchEmitTuple(blob.getName(), new FetchKey(fetcherName,
+                    blob.getName()),
+                    new EmitKey(emitterName, blob.getName()), new Metadata(), handlerConfig,
+                    getOnParseException()));
+            count++;
+        }
+        long elapsed = System.currentTimeMillis() - start;
+        LOGGER.info("finished enqueuing {} files in {} ms", count, elapsed);
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+        //TODO -- allow authentication via other methods
+        blobServiceClient = new BlobServiceClientBuilder()
+                .endpoint(endpoint)
+                .sasToken(sasToken)
+                .buildClient();
+        blobContainerClient = blobServiceClient.getBlobContainerClient(container);
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+        mustNotBeEmpty("sasToken", this.sasToken);
+        mustNotBeEmpty("endpoint", this.endpoint);
+        mustNotBeEmpty("container", this.container);
+    }
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java
new file mode 100644
index 0000000..f090ac2
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/java/org/apache/tika/pipes/pipesiterator/azblob/TestAZBlobPipesIterator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.pipesiterator.azblob;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+
+@Disabled("turn into an actual unit test")
+public class TestAZBlobPipesIterator {
+
+    @Test
+    public void testSimple() throws Exception {
+        AZBlobPipesIterator it = new AZBlobPipesIterator();
+        it.setContainer("");
+        it.setEndpoint("");
+        it.setSasToken("");
+        it.initialize(Collections.EMPTY_MAP);
+        int numConsumers = 2;
+        ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(10);
+
+        ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
+        ExecutorCompletionService c = new ExecutorCompletionService(es);
+        List<MockFetcher> fetchers = new ArrayList<>();
+        for (int i = 0; i < numConsumers; i++) {
+            MockFetcher fetcher = new MockFetcher(queue);
+            fetchers.add(fetcher);
+            c.submit(fetcher);
+        }
+        for (FetchEmitTuple t : it) {
+            queue.offer(t);
+        }
+        for (int i = 0; i < numConsumers; i++) {
+            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
+        }
+        int finished = 0;
+        int completed = 0;
+        try {
+            while (finished < numConsumers) {
+                Future<Integer> f = c.take();
+                completed += f.get();
+                finished++;
+            }
+        } finally {
+            es.shutdownNow();
+        }
+        assertEquals(1, completed);
+
+    }
+
+    private static class MockFetcher implements Callable<Integer> {
+        private final ArrayBlockingQueue<FetchEmitTuple> queue;
+        private final List<FetchEmitTuple> pairs = new ArrayList<>();
+
+        private MockFetcher(ArrayBlockingQueue<FetchEmitTuple> queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            while (true) {
+                FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
+                    return pairs.size();
+                }
+                pairs.add(t);
+            }
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c88e66e
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_ERR">
+      <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
index 50c8445..551a053 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/pom.xml
@@ -52,7 +52,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
+              <Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.gcs</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
deleted file mode 100644
index 2b2da1a..0000000
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#info,debug, error,fatal ...
-log4j.rootLogger=info,stderr
-#console
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout.ConversionPattern=%-5p [%t]: %m%n
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c88e66e
--- /dev/null
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_ERR">
+      <PatternLayout pattern="%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file